View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInitializer;
23  import io.netty.channel.ChannelOption;
24  import io.netty.channel.SimpleChannelInboundHandler;
25  import io.netty.handler.codec.DelimiterBasedFrameDecoder;
26  import io.netty.handler.codec.Delimiters;
27  import io.netty.handler.codec.string.StringDecoder;
28  import io.netty.handler.codec.string.StringEncoder;
29  import io.netty.util.CharsetUtil;
30  import io.netty.util.concurrent.ImmediateEventExecutor;
31  import io.netty.util.concurrent.Promise;
32  import org.junit.jupiter.api.Test;
33  import org.junit.jupiter.api.TestInfo;
34  import org.junit.jupiter.api.Timeout;
35  
36  import java.io.IOException;
37  import java.util.Random;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicReference;
40  
41  public class SocketStringEchoTest extends AbstractSocketTest {
42  
43      static final Random random = new Random(3);
44      static final String[] data = new String[1024];
45  
46      static {
47          StringBuilder sb = new StringBuilder();
48          for (int i = 0; i < data.length; i ++) {
49              sb.setLength(0);
50              int eLen = random.nextInt(512);
51              int j = 1;
52              while (sb.length() < eLen) {
53                  sb.append(String.format("%03X/%x.", i, j++));
54              }
55              if (sb.length() > eLen) {
56                  sb.setLength(eLen);
57              }
58              data[i] = sb.toString();
59          }
60      }
61  
62      @Test
63      @Timeout(value = 60000, unit = TimeUnit.MILLISECONDS)
64      public void testStringEcho(TestInfo testInfo) throws Throwable {
65          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
66              @Override
67              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
68                  testStringEcho(serverBootstrap, bootstrap);
69              }
70          });
71      }
72  
73      public void testStringEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
74          testStringEcho(sb, cb, true);
75      }
76  
77      @Test
78      @Timeout(value = 60000, unit = TimeUnit.MILLISECONDS)
79      public void testStringEchoNotAutoRead(TestInfo testInfo) throws Throwable {
80          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
81              @Override
82              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
83                  testStringEchoNotAutoRead(serverBootstrap, bootstrap);
84              }
85          });
86      }
87  
88      public void testStringEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
89          testStringEcho(sb, cb, false);
90      }
91  
92      private static void testStringEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
93          sb.childOption(ChannelOption.AUTO_READ, autoRead);
94          cb.option(ChannelOption.AUTO_READ, autoRead);
95  
96          Promise<Void> serverDonePromise = ImmediateEventExecutor.INSTANCE.newPromise();
97          Promise<Void> clientDonePromise = ImmediateEventExecutor.INSTANCE.newPromise();
98          final StringEchoHandler sh = new StringEchoHandler(autoRead, serverDonePromise);
99          final StringEchoHandler ch = new StringEchoHandler(autoRead, clientDonePromise);
100 
101         sb.childHandler(new ChannelInitializer<Channel>() {
102             @Override
103             public void initChannel(Channel sch) throws Exception {
104                 sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
105                 sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
106                 sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
107                 sch.pipeline().addAfter("decoder", "handler", sh);
108             }
109         });
110 
111         cb.handler(new ChannelInitializer<Channel>() {
112             @Override
113             public void initChannel(Channel sch) throws Exception {
114                 sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
115                 sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
116                 sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
117                 sch.pipeline().addAfter("decoder", "handler", ch);
118             }
119         });
120 
121         Channel sc = sb.bind().sync().channel();
122         Channel cc = cb.connect(sc.localAddress()).sync().channel();
123         for (String element : data) {
124             String delimiter = random.nextBoolean() ? "\r\n" : "\n";
125             cc.writeAndFlush(element + delimiter);
126         }
127 
128         ch.donePromise.sync();
129         sh.donePromise.sync();
130         sh.channel.close().sync();
131         ch.channel.close().sync();
132         sc.close().sync();
133 
134         if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
135             throw sh.exception.get();
136         }
137         if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
138             throw ch.exception.get();
139         }
140         if (sh.exception.get() != null) {
141             throw sh.exception.get();
142         }
143         if (ch.exception.get() != null) {
144             throw ch.exception.get();
145         }
146     }
147 
148     static class StringEchoHandler extends SimpleChannelInboundHandler<String> {
149         private final boolean autoRead;
150         private final Promise<Void> donePromise;
151         private int dataIndex;
152         volatile Channel channel;
153         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
154 
155         StringEchoHandler(boolean autoRead, Promise<Void> donePromise) {
156             this.autoRead = autoRead;
157             this.donePromise = donePromise;
158         }
159 
160         @Override
161         public void channelActive(ChannelHandlerContext ctx) throws Exception {
162             channel = ctx.channel();
163             if (!autoRead) {
164                 ctx.read();
165             }
166         }
167 
168         @Override
169         public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
170             if (!data[dataIndex].equals(msg)) {
171                 donePromise.tryFailure(new IllegalStateException("index: " + dataIndex + " didn't match!"));
172                 ctx.close();
173                 return;
174             }
175 
176             if (channel.parent() != null) {
177                 String delimiter = random.nextBoolean() ? "\r\n" : "\n";
178                 channel.write(msg + delimiter);
179             }
180 
181             if (++dataIndex >= data.length) {
182                 donePromise.setSuccess(null);
183             }
184         }
185 
186         @Override
187         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
188             try {
189                 ctx.flush();
190             } finally {
191                 if (!autoRead) {
192                     ctx.read();
193                 }
194             }
195         }
196 
197         @Override
198         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
199             if (exception.compareAndSet(null, cause)) {
200                 donePromise.tryFailure(new IllegalStateException("exceptionCaught: " + ctx.channel(), cause));
201                 ctx.close();
202             }
203         }
204 
205         @Override
206         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
207             donePromise.tryFailure(new IllegalStateException("channelInactive: " + ctx.channel()));
208         }
209     }
210 }