1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.sctp;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelInitializer;
24 import io.netty.channel.SimpleChannelInboundHandler;
25 import io.netty.channel.sctp.SctpChannel;
26 import io.netty.handler.codec.sctp.SctpInboundByteStreamHandler;
27 import io.netty.handler.codec.sctp.SctpMessageCompletionHandler;
28 import io.netty.handler.codec.sctp.SctpOutboundByteStreamHandler;
29 import io.netty.testsuite.util.TestUtils;
30 import org.junit.jupiter.api.Test;
31
32 import java.io.IOException;
33 import java.util.Random;
34 import java.util.concurrent.atomic.AtomicReference;
35 import org.junit.jupiter.api.TestInfo;
36
37 import static io.netty.testsuite.transport.TestsuitePermutation.randomBufferType;
38 import static org.junit.jupiter.api.Assertions.assertEquals;
39 import static org.junit.jupiter.api.Assumptions.assumeTrue;
40
41 public class SctpEchoTest extends AbstractSctpTest {
42
43 private static final Random random = new Random();
44 static final byte[] data = new byte[4096];
45
46 static {
47 random.nextBytes(data);
48 }
49
50 @Test
51 public void testSimpleEcho(TestInfo testInfo) throws Throwable {
52 assumeTrue(TestUtils.isSctpSupported());
53 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
54 @Override
55 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
56 testSimpleEcho(serverBootstrap, bootstrap);
57 }
58 });
59 }
60
61 public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
62 testSimpleEcho0(sb, cb, false);
63 }
64
65 @Test
66 public void testSimpleEchoUnordered(TestInfo testInfo) throws Throwable {
67 assumeTrue(TestUtils.isSctpSupported());
68 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
69 @Override
70 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
71 testSimpleEchoUnordered(serverBootstrap, bootstrap);
72 }
73 });
74 }
75
76 public void testSimpleEchoUnordered(ServerBootstrap sb, Bootstrap cb) throws Throwable {
77 testSimpleEcho0(sb, cb, true);
78 }
79
80 private static void testSimpleEcho0(ServerBootstrap sb, Bootstrap cb, final boolean unordered) throws Throwable {
81 final EchoHandler sh = new EchoHandler();
82 final EchoHandler ch = new EchoHandler();
83
84 sb.childHandler(new ChannelInitializer<SctpChannel>() {
85 @Override
86 public void initChannel(SctpChannel c) throws Exception {
87 c.pipeline().addLast(
88 new SctpMessageCompletionHandler(),
89 new SctpInboundByteStreamHandler(0, 0),
90 new SctpOutboundByteStreamHandler(0, 0, unordered),
91 sh);
92 }
93 });
94 cb.handler(new ChannelInitializer<SctpChannel>() {
95 @Override
96 public void initChannel(SctpChannel c) throws Exception {
97 c.pipeline().addLast(
98 new SctpMessageCompletionHandler(),
99 new SctpInboundByteStreamHandler(0, 0),
100 new SctpOutboundByteStreamHandler(0, 0, unordered),
101 ch);
102 }
103 });
104
105 Channel sc = sb.bind().sync().channel();
106 Channel cc = cb.connect(sc.localAddress()).sync().channel();
107
108 for (int i = 0; i < data.length;) {
109 int length = Math.min(random.nextInt(1024 * 64), data.length - i);
110 ByteBuf msg = randomBufferType(sc.alloc(), data, i, length);
111 cc.writeAndFlush(msg);
112 i += length;
113 }
114
115 while (ch.counter < data.length) {
116 if (sh.exception.get() != null) {
117 break;
118 }
119 if (ch.exception.get() != null) {
120 break;
121 }
122
123 Thread.sleep(50);
124 }
125
126 while (sh.counter < data.length) {
127 if (sh.exception.get() != null) {
128 break;
129 }
130 if (ch.exception.get() != null) {
131 break;
132 }
133
134 Thread.sleep(50);
135 }
136
137 sh.channel.close().sync();
138 ch.channel.close().sync();
139 sc.close().sync();
140
141 if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
142 throw sh.exception.get();
143 }
144 if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
145 throw ch.exception.get();
146 }
147 if (sh.exception.get() != null) {
148 throw sh.exception.get();
149 }
150 if (ch.exception.get() != null) {
151 throw ch.exception.get();
152 }
153 }
154
155 private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
156 volatile Channel channel;
157 final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
158 volatile int counter;
159
160 @Override
161 public void channelActive(ChannelHandlerContext ctx) throws Exception {
162 channel = ctx.channel();
163 }
164
165 @Override
166 public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
167 byte[] actual = new byte[in.readableBytes()];
168 in.readBytes(actual);
169
170 int lastIdx = counter;
171 for (int i = 0; i < actual.length; i++) {
172 assertEquals(data[i + lastIdx], actual[i]);
173 }
174
175
176
177 counter += actual.length;
178
179 if (channel.parent() != null) {
180 channel.writeAndFlush(randomBufferType(channel.alloc(), actual, 0, actual.length));
181 }
182 }
183
184 @Override
185 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
186 if (exception.compareAndSet(null, cause)) {
187 ctx.close();
188 }
189 }
190 }
191 }