1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelInitializer;
23 import io.netty.channel.ChannelOption;
24 import io.netty.channel.SimpleChannelInboundHandler;
25 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
26 import io.netty.handler.codec.Delimiters;
27 import io.netty.handler.codec.string.StringDecoder;
28 import io.netty.handler.codec.string.StringEncoder;
29 import io.netty.util.CharsetUtil;
30 import io.netty.util.concurrent.ImmediateEventExecutor;
31 import io.netty.util.concurrent.Promise;
32 import org.junit.jupiter.api.Test;
33 import org.junit.jupiter.api.TestInfo;
34 import org.junit.jupiter.api.Timeout;
35
36 import java.io.IOException;
37 import java.util.Random;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicReference;
40
41 public class SocketStringEchoTest extends AbstractSocketTest {
42
43 static final Random random = new Random(3);
44 static final String[] data = new String[1024];
45
46 static {
47 StringBuilder sb = new StringBuilder();
48 for (int i = 0; i < data.length; i ++) {
49 sb.setLength(0);
50 int eLen = random.nextInt(512);
51 int j = 1;
52 while (sb.length() < eLen) {
53 sb.append(String.format("%03X/%x.", i, j++));
54 }
55 if (sb.length() > eLen) {
56 sb.setLength(eLen);
57 }
58 data[i] = sb.toString();
59 }
60 }
61
62 @Test
63 @Timeout(value = 60000, unit = TimeUnit.MILLISECONDS)
64 public void testStringEcho(TestInfo testInfo) throws Throwable {
65 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
66 @Override
67 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
68 testStringEcho(serverBootstrap, bootstrap);
69 }
70 });
71 }
72
73 public void testStringEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
74 testStringEcho(sb, cb, true);
75 }
76
77 @Test
78 @Timeout(value = 60000, unit = TimeUnit.MILLISECONDS)
79 public void testStringEchoNotAutoRead(TestInfo testInfo) throws Throwable {
80 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
81 @Override
82 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
83 testStringEchoNotAutoRead(serverBootstrap, bootstrap);
84 }
85 });
86 }
87
88 public void testStringEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
89 testStringEcho(sb, cb, false);
90 }
91
92 private static void testStringEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
93 sb.childOption(ChannelOption.AUTO_READ, autoRead);
94 cb.option(ChannelOption.AUTO_READ, autoRead);
95
96 Promise<Void> serverDonePromise = ImmediateEventExecutor.INSTANCE.newPromise();
97 Promise<Void> clientDonePromise = ImmediateEventExecutor.INSTANCE.newPromise();
98 final StringEchoHandler sh = new StringEchoHandler(autoRead, serverDonePromise);
99 final StringEchoHandler ch = new StringEchoHandler(autoRead, clientDonePromise);
100
101 sb.childHandler(new ChannelInitializer<Channel>() {
102 @Override
103 public void initChannel(Channel sch) throws Exception {
104 sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
105 sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
106 sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
107 sch.pipeline().addAfter("decoder", "handler", sh);
108 }
109 });
110
111 cb.handler(new ChannelInitializer<Channel>() {
112 @Override
113 public void initChannel(Channel sch) throws Exception {
114 sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
115 sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
116 sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
117 sch.pipeline().addAfter("decoder", "handler", ch);
118 }
119 });
120
121 Channel sc = sb.bind().sync().channel();
122 Channel cc = cb.connect(sc.localAddress()).sync().channel();
123 for (String element : data) {
124 String delimiter = random.nextBoolean() ? "\r\n" : "\n";
125 cc.writeAndFlush(element + delimiter);
126 }
127
128 ch.donePromise.sync();
129 sh.donePromise.sync();
130 sh.channel.close().sync();
131 ch.channel.close().sync();
132 sc.close().sync();
133
134 if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
135 throw sh.exception.get();
136 }
137 if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
138 throw ch.exception.get();
139 }
140 if (sh.exception.get() != null) {
141 throw sh.exception.get();
142 }
143 if (ch.exception.get() != null) {
144 throw ch.exception.get();
145 }
146 }
147
148 static class StringEchoHandler extends SimpleChannelInboundHandler<String> {
149 private final boolean autoRead;
150 private final Promise<Void> donePromise;
151 private int dataIndex;
152 volatile Channel channel;
153 final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
154
155 StringEchoHandler(boolean autoRead, Promise<Void> donePromise) {
156 this.autoRead = autoRead;
157 this.donePromise = donePromise;
158 }
159
160 @Override
161 public void channelActive(ChannelHandlerContext ctx) throws Exception {
162 channel = ctx.channel();
163 if (!autoRead) {
164 ctx.read();
165 }
166 }
167
168 @Override
169 public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
170 if (!data[dataIndex].equals(msg)) {
171 donePromise.tryFailure(new IllegalStateException("index: " + dataIndex + " didn't match!"));
172 ctx.close();
173 return;
174 }
175
176 if (channel.parent() != null) {
177 String delimiter = random.nextBoolean() ? "\r\n" : "\n";
178 channel.write(msg + delimiter);
179 }
180
181 if (++dataIndex >= data.length) {
182 donePromise.setSuccess(null);
183 }
184 }
185
186 @Override
187 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
188 try {
189 ctx.flush();
190 } finally {
191 if (!autoRead) {
192 ctx.read();
193 }
194 }
195 }
196
197 @Override
198 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
199 if (exception.compareAndSet(null, cause)) {
200 donePromise.tryFailure(new IllegalStateException("exceptionCaught: " + ctx.channel(), cause));
201 ctx.close();
202 }
203 }
204
205 @Override
206 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
207 donePromise.tryFailure(new IllegalStateException("channelInactive: " + ctx.channel()));
208 }
209 }
210 }