1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.testsuite.transport.socket;
17
18 import io.netty5.bootstrap.Bootstrap;
19 import io.netty5.bootstrap.ServerBootstrap;
20 import io.netty5.buffer.api.Buffer;
21 import io.netty5.buffer.api.DefaultBufferAllocators;
22 import io.netty5.channel.Channel;
23 import io.netty5.channel.ChannelHandler;
24 import io.netty5.channel.ChannelHandlerContext;
25 import io.netty5.channel.ChannelOption;
26 import io.netty5.channel.SimpleChannelInboundHandler;
27 import org.junit.jupiter.api.Test;
28 import org.junit.jupiter.api.TestInfo;
29 import org.junit.jupiter.api.Timeout;
30
31 import java.io.IOException;
32 import java.util.Random;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import static org.junit.jupiter.api.Assertions.assertEquals;
37
38 public class SocketEchoTest extends AbstractSocketTest {
39
40 private static final Random random = new Random();
41 static final byte[] data = new byte[1048576];
42
43 static {
44 random.nextBytes(data);
45 }
46
47 @Test
48 @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
49 public void testSimpleEcho(TestInfo testInfo) throws Throwable {
50 run(testInfo, this::testSimpleEcho);
51 }
52
53 public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
54 testSimpleEcho0(sb, cb, true);
55 }
56
57 @Test
58 @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
59 public void testSimpleEchoNotAutoRead(TestInfo testInfo) throws Throwable {
60 run(testInfo, this::testSimpleEchoNotAutoRead);
61 }
62
63 public void testSimpleEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
64 testSimpleEcho0(sb, cb, false);
65 }
66
67 private static void testSimpleEcho0(
68 ServerBootstrap sb, Bootstrap cb, boolean autoRead)
69 throws Throwable {
70 final EchoHandler sh = new EchoHandler(autoRead);
71 final EchoHandler ch = new EchoHandler(autoRead);
72
73 sb.childHandler(sh);
74 sb.handler(new ChannelHandler() {
75 @Override
76 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
77 cause.printStackTrace();
78 }
79 });
80 cb.handler(ch);
81 sb.childOption(ChannelOption.AUTO_READ, autoRead);
82 cb.option(ChannelOption.AUTO_READ, autoRead);
83
84 Channel sc = sb.bind().asStage().get();
85 Channel cc = cb.connect(sc.localAddress()).asStage().get();
86
87 try (Buffer src = DefaultBufferAllocators.preferredAllocator().copyOf(data)) {
88 for (int i = 0; i < data.length;) {
89 int length = Math.min(random.nextInt(1024 * 64), data.length - i);
90 cc.writeAndFlush(src.readSplit(length));
91 i += length;
92 }
93 }
94
95 while (ch.counter < data.length) {
96 if (sh.exception.get() != null) {
97 break;
98 }
99 if (ch.exception.get() != null) {
100 break;
101 }
102
103 try {
104 Thread.sleep(50);
105 } catch (InterruptedException e) {
106
107 }
108 }
109
110 while (sh.counter < data.length) {
111 if (sh.exception.get() != null) {
112 break;
113 }
114 if (ch.exception.get() != null) {
115 break;
116 }
117
118 try {
119 Thread.sleep(50);
120 } catch (InterruptedException e) {
121
122 }
123 }
124
125 sh.channel.close().asStage().sync();
126 ch.channel.close().asStage().sync();
127 sc.close().asStage().sync();
128
129 if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
130 throw sh.exception.get();
131 }
132 if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
133 throw ch.exception.get();
134 }
135 if (sh.exception.get() != null) {
136 throw sh.exception.get();
137 }
138 if (ch.exception.get() != null) {
139 throw ch.exception.get();
140 }
141 }
142
143 private static class EchoHandler extends SimpleChannelInboundHandler<Buffer> {
144 private final boolean autoRead;
145 volatile Channel channel;
146 final AtomicReference<Throwable> exception = new AtomicReference<>();
147 volatile int counter;
148
149 EchoHandler(boolean autoRead) {
150 this.autoRead = autoRead;
151 }
152
153 @Override
154 public void channelActive(ChannelHandlerContext ctx)
155 throws Exception {
156 channel = ctx.channel();
157 if (!autoRead) {
158 ctx.read();
159 }
160 }
161
162 @Override
163 public void messageReceived(ChannelHandlerContext ctx, Buffer in) throws Exception {
164 byte[] actual = new byte[in.readableBytes()];
165 in.readBytes(actual, 0, actual.length);
166
167 int lastIdx = counter;
168 for (int i = 0; i < actual.length; i ++) {
169 assertEquals(data[i + lastIdx], actual[i]);
170 }
171
172 if (channel.parent() != null) {
173 channel.write(ctx.bufferAllocator().copyOf(actual));
174 }
175
176 counter += actual.length;
177 }
178
179 @Override
180 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
181 try {
182 ctx.flush();
183 } finally {
184 if (!autoRead) {
185 ctx.read();
186 }
187 }
188 }
189
190 @Override
191 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
192 if (exception.compareAndSet(null, cause)) {
193 cause.printStackTrace();
194 ctx.close();
195 }
196 }
197 }
198 }