1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.sctp;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInitializer;
25 import io.netty.channel.SimpleChannelInboundHandler;
26 import io.netty.channel.sctp.SctpChannel;
27 import io.netty.handler.codec.sctp.SctpInboundByteStreamHandler;
28 import io.netty.handler.codec.sctp.SctpMessageCompletionHandler;
29 import io.netty.handler.codec.sctp.SctpOutboundByteStreamHandler;
30 import io.netty.testsuite.util.TestUtils;
31 import org.junit.jupiter.api.Test;
32
33 import java.io.IOException;
34 import java.util.Random;
35 import java.util.concurrent.atomic.AtomicReference;
36 import org.junit.jupiter.api.TestInfo;
37
38 import static io.netty.testsuite.transport.TestsuitePermutation.randomBufferType;
39 import static org.junit.jupiter.api.Assertions.assertEquals;
40 import static org.junit.jupiter.api.Assumptions.assumeTrue;
41
42 public class SctpEchoTest extends AbstractSctpTest {
43
44 private static final Random random = new Random();
45 static final byte[] data = new byte[4096];
46
47 static {
48 random.nextBytes(data);
49 }
50
51 @Test
52 public void testSimpleEcho(TestInfo testInfo) throws Throwable {
53 assumeTrue(TestUtils.isSctpSupported());
54 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
55 @Override
56 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
57 testSimpleEcho(serverBootstrap, bootstrap);
58 }
59 });
60 }
61
62 public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
63 testSimpleEcho0(sb, cb, false);
64 }
65
66 @Test
67 public void testSimpleEchoUnordered(TestInfo testInfo) throws Throwable {
68 assumeTrue(TestUtils.isSctpSupported());
69 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
70 @Override
71 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
72 testSimpleEchoUnordered(serverBootstrap, bootstrap);
73 }
74 });
75 }
76
77 public void testSimpleEchoUnordered(ServerBootstrap sb, Bootstrap cb) throws Throwable {
78 testSimpleEcho0(sb, cb, true);
79 }
80
81 private static void testSimpleEcho0(ServerBootstrap sb, Bootstrap cb, final boolean unordered) throws Throwable {
82 final EchoHandler sh = new EchoHandler();
83 final EchoHandler ch = new EchoHandler();
84
85 sb.childHandler(new ChannelInitializer<SctpChannel>() {
86 @Override
87 public void initChannel(SctpChannel c) throws Exception {
88 c.pipeline().addLast(
89 new SctpMessageCompletionHandler(),
90 new SctpInboundByteStreamHandler(0, 0),
91 new SctpOutboundByteStreamHandler(0, 0, unordered),
92 sh);
93 }
94 });
95 cb.handler(new ChannelInitializer<SctpChannel>() {
96 @Override
97 public void initChannel(SctpChannel c) throws Exception {
98 c.pipeline().addLast(
99 new SctpMessageCompletionHandler(),
100 new SctpInboundByteStreamHandler(0, 0),
101 new SctpOutboundByteStreamHandler(0, 0, unordered),
102 ch);
103 }
104 });
105
106 Channel sc = sb.bind().sync().channel();
107 Channel cc = cb.connect(sc.localAddress()).sync().channel();
108
109 for (int i = 0; i < data.length;) {
110 int length = Math.min(random.nextInt(1024 * 64), data.length - i);
111 cc.writeAndFlush(Unpooled.wrappedBuffer(data, i, length));
112 i += length;
113 }
114
115 while (ch.counter < data.length) {
116 if (sh.exception.get() != null) {
117 break;
118 }
119 if (ch.exception.get() != null) {
120 break;
121 }
122
123 Thread.sleep(50);
124 }
125
126 while (sh.counter < data.length) {
127 if (sh.exception.get() != null) {
128 break;
129 }
130 if (ch.exception.get() != null) {
131 break;
132 }
133
134 Thread.sleep(50);
135 }
136
137 sh.channel.close().sync();
138 ch.channel.close().sync();
139 sc.close().sync();
140
141 if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
142 throw sh.exception.get();
143 }
144 if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
145 throw ch.exception.get();
146 }
147 if (sh.exception.get() != null) {
148 throw sh.exception.get();
149 }
150 if (ch.exception.get() != null) {
151 throw ch.exception.get();
152 }
153 }
154
155 private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
156 volatile Channel channel;
157 final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
158 volatile int counter;
159
160 @Override
161 public void channelActive(ChannelHandlerContext ctx) throws Exception {
162 channel = ctx.channel();
163 }
164
165 @Override
166 public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
167 byte[] actual = new byte[in.readableBytes()];
168 in.readBytes(actual);
169
170 int lastIdx = counter;
171 for (int i = 0; i < actual.length; i++) {
172 assertEquals(data[i + lastIdx], actual[i]);
173 }
174
175
176
177 counter += actual.length;
178
179 if (channel.parent() != null) {
180 channel.writeAndFlush(randomBufferType(channel.alloc(), actual, 0, actual.length));
181 }
182 }
183
184 @Override
185 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
186 if (exception.compareAndSet(null, cause)) {
187 ctx.close();
188 }
189 }
190 }
191 }