1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.testsuite.transport.socket;
17
18 import io.netty5.bootstrap.Bootstrap;
19 import io.netty5.bootstrap.ServerBootstrap;
20 import io.netty5.buffer.api.Buffer;
21 import io.netty5.channel.Channel;
22 import io.netty5.channel.ChannelHandlerContext;
23 import io.netty5.channel.ChannelInitializer;
24 import io.netty5.channel.ChannelOption;
25 import io.netty5.channel.SimpleChannelInboundHandler;
26 import io.netty5.handler.codec.FixedLengthFrameDecoder;
27 import org.junit.jupiter.api.Test;
28 import org.junit.jupiter.api.TestInfo;
29
30 import java.io.IOException;
31 import java.util.Random;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import static org.junit.jupiter.api.Assertions.assertEquals;
35
36 public class SocketFixedLengthEchoTest extends AbstractSocketTest {
37
38 private static final Random random = new Random();
39 static final byte[] data = new byte[1048576];
40
41 static {
42 random.nextBytes(data);
43 }
44
45 @Test
46 public void testFixedLengthEcho(TestInfo testInfo) throws Throwable {
47 run(testInfo, this::testFixedLengthEcho);
48 }
49
50 @Test
51 public void testFixedLengthEchoNotAutoRead(TestInfo testInfo) throws Throwable {
52 run(testInfo, this::testFixedLengthEchoNotAutoRead);
53 }
54
55 public void testFixedLengthEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
56 testFixedLengthEcho(sb, cb, true);
57 }
58
59 public void testFixedLengthEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
60 testFixedLengthEcho(sb, cb, false);
61 }
62
63 private static void testFixedLengthEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
64 final EchoHandler sh = new EchoHandler(autoRead);
65 final EchoHandler ch = new EchoHandler(autoRead);
66
67 sb.childOption(ChannelOption.AUTO_READ, autoRead);
68 sb.childHandler(new ChannelInitializer<>() {
69 @Override
70 public void initChannel(Channel sch) throws Exception {
71 sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024));
72 sch.pipeline().addAfter("decoder", "handler", sh);
73 }
74 });
75
76 cb.option(ChannelOption.AUTO_READ, autoRead);
77 cb.handler(new ChannelInitializer<>() {
78 @Override
79 public void initChannel(Channel sch) throws Exception {
80 sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024));
81 sch.pipeline().addAfter("decoder", "handler", ch);
82 }
83 });
84
85 Channel sc = sb.bind().asStage().get();
86 Channel cc = cb.connect(sc.localAddress()).asStage().get();
87
88 try (Buffer buffer = sc.bufferAllocator().copyOf(data)) {
89 for (int i = 0; i < data.length;) {
90 int length = Math.min(random.nextInt(1024 * 3), data.length - i);
91 cc.writeAndFlush(buffer.readSplit(length));
92 i += length;
93 }
94 }
95
96 while (ch.counter < data.length) {
97 if (sh.exception.get() != null) {
98 break;
99 }
100 if (ch.exception.get() != null) {
101 break;
102 }
103
104 Thread.sleep(50);
105 }
106
107 while (sh.counter < data.length) {
108 if (sh.exception.get() != null) {
109 break;
110 }
111 if (ch.exception.get() != null) {
112 break;
113 }
114
115 Thread.sleep(50);
116 }
117
118 sh.channel.close().asStage().sync();
119 ch.channel.close().asStage().sync();
120 sc.close().asStage().sync();
121
122 if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
123 throw sh.exception.get();
124 }
125 if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
126 throw ch.exception.get();
127 }
128 if (sh.exception.get() != null) {
129 throw sh.exception.get();
130 }
131 if (ch.exception.get() != null) {
132 throw ch.exception.get();
133 }
134 }
135
136 private static class EchoHandler extends SimpleChannelInboundHandler<Buffer> {
137 private final boolean autoRead;
138 volatile Channel channel;
139 final AtomicReference<Throwable> exception = new AtomicReference<>();
140 volatile int counter;
141
142 EchoHandler(boolean autoRead) {
143 this.autoRead = autoRead;
144 }
145
146 @Override
147 public void channelActive(ChannelHandlerContext ctx) throws Exception {
148 channel = ctx.channel();
149 if (!autoRead) {
150 ctx.read();
151 }
152 }
153
154 @Override
155 public void messageReceived(ChannelHandlerContext ctx, Buffer msg) throws Exception {
156 assertEquals(1024, msg.readableBytes());
157
158 byte[] actual = new byte[msg.readableBytes()];
159 msg.copyInto(msg.readerOffset(), actual, 0, msg.readableBytes());
160
161 int lastIdx = counter;
162 for (int i = 0; i < actual.length; i ++) {
163 assertEquals(data[i + lastIdx], actual[i]);
164 }
165
166 if (channel.parent() != null) {
167 channel.write(msg.split());
168 }
169
170 counter += actual.length;
171 }
172
173 @Override
174 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
175 try {
176 ctx.flush();
177 } finally {
178 if (!autoRead) {
179 ctx.read();
180 }
181 }
182 }
183
184 @Override
185 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
186 if (exception.compareAndSet(null, cause)) {
187 ctx.close();
188 }
189 }
190 }
191 }