View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInboundHandlerAdapter;
25  import io.netty.channel.ChannelInitializer;
26  import io.netty.channel.ChannelOption;
27  import io.netty.channel.SimpleChannelInboundHandler;
28  import io.netty.util.concurrent.DefaultEventExecutorGroup;
29  import io.netty.util.concurrent.EventExecutorGroup;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.junit.jupiter.api.AfterAll;
33  import org.junit.jupiter.api.BeforeAll;
34  import org.junit.jupiter.api.Test;
35  
36  import java.io.IOException;
37  import java.util.Random;
38  import java.util.concurrent.atomic.AtomicReference;
39  import org.junit.jupiter.api.TestInfo;
40  import org.junit.jupiter.api.Timeout;
41  
42  import static org.junit.jupiter.api.Assertions.assertEquals;
43  import static org.junit.jupiter.api.Assertions.assertNotEquals;
44  
45  public class SocketEchoTest extends AbstractSocketTest {
46  
47      private static final Random random = new Random();
48      static final byte[] data = new byte[1048576];
49  
50      private static EventExecutorGroup group;
51  
52      static {
53          random.nextBytes(data);
54      }
55  
56      @BeforeAll
57      public static void createGroup() {
58          group = new DefaultEventExecutorGroup(2);
59      }
60  
61      @AfterAll
62      public static void destroyGroup() throws Exception {
63          group.shutdownGracefully().sync();
64      }
65  
66      @Test
67      @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
68      public void testSimpleEcho(TestInfo testInfo) throws Throwable {
69          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
70              @Override
71              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
72                  testSimpleEcho(serverBootstrap, bootstrap);
73              }
74          });
75      }
76  
77      public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
78          testSimpleEcho0(sb, cb, false, false, true);
79      }
80  
81      @Test
82      @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
83      public void testSimpleEchoNotAutoRead(TestInfo testInfo) throws Throwable {
84          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
85              @Override
86              public void run(ServerBootstrap sb1, Bootstrap cb1) throws Throwable {
87                  testSimpleEchoNotAutoRead(sb1, cb1);
88              }
89          });
90      }
91  
92      public void testSimpleEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
93          testSimpleEcho0(sb, cb, false, false, false);
94      }
95  
96      @Test
97      public void testSimpleEchoWithAdditionalExecutor(TestInfo testInfo) throws Throwable {
98          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
99              @Override
100             public void run(ServerBootstrap sb1, Bootstrap cb1) throws Throwable {
101                 testSimpleEchoWithAdditionalExecutor(sb1, cb1);
102             }
103         });
104     }
105 
106     public void testSimpleEchoWithAdditionalExecutor(ServerBootstrap sb, Bootstrap cb) throws Throwable {
107         testSimpleEcho0(sb, cb, true, false, true);
108     }
109 
110     @Test
111     public void testSimpleEchoWithAdditionalExecutorNotAutoRead(TestInfo testInfo) throws Throwable {
112         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
113             @Override
114             public void run(ServerBootstrap sb1, Bootstrap cb1) throws Throwable {
115                 testSimpleEchoWithAdditionalExecutorNotAutoRead(sb1, cb1);
116             }
117         });
118     }
119 
120     public void testSimpleEchoWithAdditionalExecutorNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
121         testSimpleEcho0(sb, cb, true, false, false);
122     }
123 
124     @Test
125     public void testSimpleEchoWithVoidPromise(TestInfo testInfo) throws Throwable {
126         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
127             @Override
128             public void run(ServerBootstrap sb1, Bootstrap cb1) throws Throwable {
129                 testSimpleEchoWithVoidPromise(sb1, cb1);
130             }
131         });
132     }
133 
134     public void testSimpleEchoWithVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
135         testSimpleEcho0(sb, cb, false, true, true);
136     }
137 
138     @Test
139     public void testSimpleEchoWithVoidPromiseNotAutoRead(TestInfo testInfo) throws Throwable {
140         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
141             @Override
142             public void run(ServerBootstrap sb1, Bootstrap cb1) throws Throwable {
143                 testSimpleEchoWithVoidPromiseNotAutoRead(sb1, cb1);
144             }
145         });
146     }
147 
148     public void testSimpleEchoWithVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
149         testSimpleEcho0(sb, cb, false, true, false);
150     }
151 
152     @Test
153     @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
154     public void testSimpleEchoWithAdditionalExecutorAndVoidPromise(TestInfo testInfo) throws Throwable {
155         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
156             @Override
157             public void run(ServerBootstrap sb1, Bootstrap cb1) throws Throwable {
158                 testSimpleEchoWithAdditionalExecutorAndVoidPromise(sb1, cb1);
159             }
160         });
161     }
162 
163     public void testSimpleEchoWithAdditionalExecutorAndVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
164         testSimpleEcho0(sb, cb, true, true, true);
165     }
166 
167     private static void testSimpleEcho0(
168             ServerBootstrap sb, Bootstrap cb, boolean additionalExecutor, boolean voidPromise, boolean autoRead)
169             throws Throwable {
170 
171         final EchoHandler sh = new EchoHandler(autoRead);
172         final EchoHandler ch = new EchoHandler(autoRead);
173 
174         if (additionalExecutor) {
175             sb.childHandler(new ChannelInitializer<Channel>() {
176                 @Override
177                 protected void initChannel(Channel c) throws Exception {
178                     c.pipeline().addLast(group, sh);
179                 }
180             });
181             cb.handler(new ChannelInitializer<Channel>() {
182                 @Override
183                 protected void initChannel(Channel c) throws Exception {
184                     c.pipeline().addLast(group, ch);
185                 }
186             });
187         } else {
188             sb.childHandler(sh);
189             sb.handler(new ChannelInboundHandlerAdapter() {
190                 @Override
191                 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
192                     cause.printStackTrace();
193                 }
194             });
195             cb.handler(ch);
196         }
197         sb.childOption(ChannelOption.AUTO_READ, autoRead);
198         cb.option(ChannelOption.AUTO_READ, autoRead);
199 
200         Channel sc = sb.bind().sync().channel();
201         Channel cc = cb.connect(sc.localAddress()).sync().channel();
202 
203         for (int i = 0; i < data.length;) {
204             int length = Math.min(random.nextInt(1024 * 64), data.length - i);
205             ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
206             if (voidPromise) {
207                 assertEquals(cc.voidPromise(), cc.writeAndFlush(buf, cc.voidPromise()));
208             } else {
209                 assertNotEquals(cc.voidPromise(), cc.writeAndFlush(buf));
210             }
211             i += length;
212         }
213 
214         while (ch.counter < data.length) {
215             if (sh.exception.get() != null) {
216                 break;
217             }
218             if (ch.exception.get() != null) {
219                 break;
220             }
221 
222             try {
223                 Thread.sleep(50);
224             } catch (InterruptedException e) {
225                 // Ignore.
226             }
227         }
228 
229         while (sh.counter < data.length) {
230             if (sh.exception.get() != null) {
231                 break;
232             }
233             if (ch.exception.get() != null) {
234                 break;
235             }
236 
237             try {
238                 Thread.sleep(50);
239             } catch (InterruptedException e) {
240                 // Ignore.
241             }
242         }
243 
244         sh.channel.close().sync();
245         ch.channel.close().sync();
246         sc.close().sync();
247 
248         if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
249             throw sh.exception.get();
250         }
251         if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
252             throw ch.exception.get();
253         }
254         if (sh.exception.get() != null) {
255             throw sh.exception.get();
256         }
257         if (ch.exception.get() != null) {
258             throw ch.exception.get();
259         }
260     }
261 
262     private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
263         private final boolean autoRead;
264         volatile Channel channel;
265         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
266         volatile int counter;
267 
268         EchoHandler(boolean autoRead) {
269             this.autoRead = autoRead;
270         }
271 
272         @Override
273         public void channelActive(ChannelHandlerContext ctx)
274                 throws Exception {
275             channel = ctx.channel();
276             if (!autoRead) {
277                 ctx.read();
278             }
279         }
280 
281         @Override
282         public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
283             byte[] actual = new byte[in.readableBytes()];
284             in.readBytes(actual);
285 
286             int lastIdx = counter;
287             for (int i = 0; i < actual.length; i ++) {
288                 assertEquals(data[i + lastIdx], actual[i]);
289             }
290 
291             if (channel.parent() != null) {
292                 channel.write(Unpooled.wrappedBuffer(actual));
293             }
294 
295             counter += actual.length;
296         }
297 
298         @Override
299         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
300             try {
301                 ctx.flush();
302             } finally {
303                 if (!autoRead) {
304                     ctx.read();
305                 }
306             }
307         }
308 
309         @Override
310         public void exceptionCaught(ChannelHandlerContext ctx,
311                 Throwable cause) throws Exception {
312             if (exception.compareAndSet(null, cause)) {
313                 cause.printStackTrace();
314                 ctx.close();
315             }
316         }
317     }
318 }