View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.testsuite.transport.socket;
17  
18  import io.netty5.bootstrap.Bootstrap;
19  import io.netty5.bootstrap.ServerBootstrap;
20  import io.netty5.buffer.api.Buffer;
21  import io.netty5.channel.Channel;
22  import io.netty5.channel.ChannelHandlerContext;
23  import io.netty5.channel.ChannelInitializer;
24  import io.netty5.channel.ChannelOption;
25  import io.netty5.channel.SimpleChannelInboundHandler;
26  import io.netty5.handler.codec.FixedLengthFrameDecoder;
27  import org.junit.jupiter.api.Test;
28  import org.junit.jupiter.api.TestInfo;
29  
30  import java.io.IOException;
31  import java.util.Random;
32  import java.util.concurrent.atomic.AtomicReference;
33  
34  import static org.junit.jupiter.api.Assertions.assertEquals;
35  
36  public class SocketFixedLengthEchoTest extends AbstractSocketTest {
37  
38      private static final Random random = new Random();
39      static final byte[] data = new byte[1048576];
40  
41      static {
42          random.nextBytes(data);
43      }
44  
45      @Test
46      public void testFixedLengthEcho(TestInfo testInfo) throws Throwable {
47          run(testInfo, this::testFixedLengthEcho);
48      }
49  
50      @Test
51      public void testFixedLengthEchoNotAutoRead(TestInfo testInfo) throws Throwable {
52          run(testInfo, this::testFixedLengthEchoNotAutoRead);
53      }
54  
55      public void testFixedLengthEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
56          testFixedLengthEcho(sb, cb, true);
57      }
58  
59      public void testFixedLengthEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
60          testFixedLengthEcho(sb, cb, false);
61      }
62  
63      private static void testFixedLengthEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
64          final EchoHandler sh = new EchoHandler(autoRead);
65          final EchoHandler ch = new EchoHandler(autoRead);
66  
67          sb.childOption(ChannelOption.AUTO_READ, autoRead);
68          sb.childHandler(new ChannelInitializer<>() {
69              @Override
70              public void initChannel(Channel sch) throws Exception {
71                  sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024));
72                  sch.pipeline().addAfter("decoder", "handler", sh);
73              }
74          });
75  
76          cb.option(ChannelOption.AUTO_READ, autoRead);
77          cb.handler(new ChannelInitializer<>() {
78              @Override
79              public void initChannel(Channel sch) throws Exception {
80                  sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024));
81                  sch.pipeline().addAfter("decoder", "handler", ch);
82              }
83          });
84  
85          Channel sc = sb.bind().asStage().get();
86          Channel cc = cb.connect(sc.localAddress()).asStage().get();
87  
88          try (Buffer buffer = sc.bufferAllocator().copyOf(data)) {
89              for (int i = 0; i < data.length;) {
90                  int length = Math.min(random.nextInt(1024 * 3), data.length - i);
91                  cc.writeAndFlush(buffer.readSplit(length));
92                  i += length;
93              }
94          }
95  
96          while (ch.counter < data.length) {
97              if (sh.exception.get() != null) {
98                  break;
99              }
100             if (ch.exception.get() != null) {
101                 break;
102             }
103 
104             Thread.sleep(50);
105         }
106 
107         while (sh.counter < data.length) {
108             if (sh.exception.get() != null) {
109                 break;
110             }
111             if (ch.exception.get() != null) {
112                 break;
113             }
114 
115             Thread.sleep(50);
116         }
117 
118         sh.channel.close().asStage().sync();
119         ch.channel.close().asStage().sync();
120         sc.close().asStage().sync();
121 
122         if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
123             throw sh.exception.get();
124         }
125         if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
126             throw ch.exception.get();
127         }
128         if (sh.exception.get() != null) {
129             throw sh.exception.get();
130         }
131         if (ch.exception.get() != null) {
132             throw ch.exception.get();
133         }
134     }
135 
136     private static class EchoHandler extends SimpleChannelInboundHandler<Buffer> {
137         private final boolean autoRead;
138         volatile Channel channel;
139         final AtomicReference<Throwable> exception = new AtomicReference<>();
140         volatile int counter;
141 
142         EchoHandler(boolean autoRead) {
143             this.autoRead = autoRead;
144         }
145 
146         @Override
147         public void channelActive(ChannelHandlerContext ctx) throws Exception {
148             channel = ctx.channel();
149             if (!autoRead) {
150                 ctx.read();
151             }
152         }
153 
154         @Override
155         public void messageReceived(ChannelHandlerContext ctx, Buffer msg) throws Exception {
156             assertEquals(1024, msg.readableBytes());
157 
158             byte[] actual = new byte[msg.readableBytes()];
159             msg.copyInto(msg.readerOffset(), actual, 0, msg.readableBytes());
160 
161             int lastIdx = counter;
162             for (int i = 0; i < actual.length; i ++) {
163                 assertEquals(data[i + lastIdx], actual[i]);
164             }
165 
166             if (channel.parent() != null) {
167                 channel.write(msg.split());
168             }
169 
170             counter += actual.length;
171         }
172 
173         @Override
174         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
175             try {
176                 ctx.flush();
177             } finally {
178                 if (!autoRead) {
179                     ctx.read();
180                 }
181             }
182         }
183 
184         @Override
185         public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
186             if (exception.compareAndSet(null, cause)) {
187                 ctx.close();
188             }
189         }
190     }
191 }