View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.testsuite.transport.socket;
17  
18  import io.netty5.bootstrap.Bootstrap;
19  import io.netty5.bootstrap.ServerBootstrap;
20  import io.netty5.buffer.api.Buffer;
21  import io.netty5.buffer.api.DefaultBufferAllocators;
22  import io.netty5.channel.Channel;
23  import io.netty5.channel.ChannelHandler;
24  import io.netty5.channel.ChannelHandlerContext;
25  import io.netty5.channel.ChannelOption;
26  import io.netty5.channel.SimpleChannelInboundHandler;
27  import org.junit.jupiter.api.Test;
28  import org.junit.jupiter.api.TestInfo;
29  import org.junit.jupiter.api.Timeout;
30  
31  import java.io.IOException;
32  import java.util.Random;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  import static org.junit.jupiter.api.Assertions.assertEquals;
37  
38  public class SocketEchoTest extends AbstractSocketTest {
39  
40      private static final Random random = new Random();
41      static final byte[] data = new byte[1048576];
42  
43      static {
44          random.nextBytes(data);
45      }
46  
47      @Test
48      @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
49      public void testSimpleEcho(TestInfo testInfo) throws Throwable {
50          run(testInfo, this::testSimpleEcho);
51      }
52  
53      public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
54          testSimpleEcho0(sb, cb, true);
55      }
56  
57      @Test
58      @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
59      public void testSimpleEchoNotAutoRead(TestInfo testInfo) throws Throwable {
60          run(testInfo, this::testSimpleEchoNotAutoRead);
61      }
62  
63      public void testSimpleEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
64          testSimpleEcho0(sb, cb, false);
65      }
66  
67      private static void testSimpleEcho0(
68              ServerBootstrap sb, Bootstrap cb, boolean autoRead)
69              throws Throwable {
70          final EchoHandler sh = new EchoHandler(autoRead);
71          final EchoHandler ch = new EchoHandler(autoRead);
72  
73          sb.childHandler(sh);
74          sb.handler(new ChannelHandler() {
75              @Override
76              public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
77                  cause.printStackTrace();
78              }
79          });
80          cb.handler(ch);
81          sb.childOption(ChannelOption.AUTO_READ, autoRead);
82          cb.option(ChannelOption.AUTO_READ, autoRead);
83  
84          Channel sc = sb.bind().asStage().get();
85          Channel cc = cb.connect(sc.localAddress()).asStage().get();
86  
87          try (Buffer src = DefaultBufferAllocators.preferredAllocator().copyOf(data)) {
88              for (int i = 0; i < data.length;) {
89                  int length = Math.min(random.nextInt(1024 * 64), data.length - i);
90                  cc.writeAndFlush(src.readSplit(length));
91                  i += length;
92              }
93          }
94  
95          while (ch.counter < data.length) {
96              if (sh.exception.get() != null) {
97                  break;
98              }
99              if (ch.exception.get() != null) {
100                 break;
101             }
102 
103             try {
104                 Thread.sleep(50);
105             } catch (InterruptedException e) {
106                 // Ignore.
107             }
108         }
109 
110         while (sh.counter < data.length) {
111             if (sh.exception.get() != null) {
112                 break;
113             }
114             if (ch.exception.get() != null) {
115                 break;
116             }
117 
118             try {
119                 Thread.sleep(50);
120             } catch (InterruptedException e) {
121                 // Ignore.
122             }
123         }
124 
125         sh.channel.close().asStage().sync();
126         ch.channel.close().asStage().sync();
127         sc.close().asStage().sync();
128 
129         if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
130             throw sh.exception.get();
131         }
132         if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
133             throw ch.exception.get();
134         }
135         if (sh.exception.get() != null) {
136             throw sh.exception.get();
137         }
138         if (ch.exception.get() != null) {
139             throw ch.exception.get();
140         }
141     }
142 
143     private static class EchoHandler extends SimpleChannelInboundHandler<Buffer> {
144         private final boolean autoRead;
145         volatile Channel channel;
146         final AtomicReference<Throwable> exception = new AtomicReference<>();
147         volatile int counter;
148 
149         EchoHandler(boolean autoRead) {
150             this.autoRead = autoRead;
151         }
152 
153         @Override
154         public void channelActive(ChannelHandlerContext ctx)
155                 throws Exception {
156             channel = ctx.channel();
157             if (!autoRead) {
158                 ctx.read();
159             }
160         }
161 
162         @Override
163         public void messageReceived(ChannelHandlerContext ctx, Buffer in) throws Exception {
164             byte[] actual = new byte[in.readableBytes()];
165             in.readBytes(actual, 0, actual.length);
166 
167             int lastIdx = counter;
168             for (int i = 0; i < actual.length; i ++) {
169                 assertEquals(data[i + lastIdx], actual[i]);
170             }
171 
172             if (channel.parent() != null) {
173                 channel.write(ctx.bufferAllocator().copyOf(actual));
174             }
175 
176             counter += actual.length;
177         }
178 
179         @Override
180         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
181             try {
182                 ctx.flush();
183             } finally {
184                 if (!autoRead) {
185                     ctx.read();
186                 }
187             }
188         }
189 
190         @Override
191         public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
192             if (exception.compareAndSet(null, cause)) {
193                 cause.printStackTrace();
194                 ctx.close();
195             }
196         }
197     }
198 }