View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInboundHandlerAdapter;
23  import io.netty.channel.ChannelInitializer;
24  import io.netty.channel.ChannelOption;
25  import io.netty.handler.codec.serialization.ClassResolvers;
26  import io.netty.handler.codec.serialization.ObjectDecoder;
27  import io.netty.handler.codec.serialization.ObjectEncoder;
28  import org.junit.jupiter.api.Test;
29  import org.junit.jupiter.api.TestInfo;
30  
31  import java.io.IOException;
32  import java.util.Random;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  import static org.junit.jupiter.api.Assertions.assertEquals;
36  
37  public class SocketObjectEchoTest extends AbstractSocketTest {
38  
39      static final Random random = new Random();
40      static final String[] data = new String[1024];
41  
42      static {
43          for (int i = 0; i < data.length; i ++) {
44              int eLen = random.nextInt(512);
45              char[] e = new char[eLen];
46              for (int j = 0; j < eLen; j ++) {
47                  e[j] = (char) ('a' + random.nextInt(26));
48              }
49  
50              data[i] = new String(e);
51          }
52      }
53  
54      @Test
55      public void testObjectEcho(TestInfo testInfo) throws Throwable {
56          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
57              @Override
58              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
59                  testObjectEcho(serverBootstrap, bootstrap);
60              }
61          });
62      }
63  
64      public void testObjectEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
65          testObjectEcho(sb, cb, true);
66      }
67  
68      @Test
69      public void testObjectEchoNotAutoRead(TestInfo testInfo) throws Throwable {
70          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
71              @Override
72              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
73                  testObjectEchoNotAutoRead(serverBootstrap, bootstrap);
74              }
75          });
76      }
77  
78      public void testObjectEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
79          testObjectEcho(sb, cb, false);
80      }
81  
82      private static void testObjectEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
83          sb.childOption(ChannelOption.AUTO_READ, autoRead);
84          cb.option(ChannelOption.AUTO_READ, autoRead);
85  
86          final EchoHandler sh = new EchoHandler(autoRead);
87          final EchoHandler ch = new EchoHandler(autoRead);
88  
89          sb.childHandler(new ChannelInitializer<Channel>() {
90              @Override
91              public void initChannel(Channel sch) throws Exception {
92                  sch.pipeline().addLast(
93                          new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
94                          new ObjectEncoder(),
95                          sh);
96              }
97          });
98  
99          cb.handler(new ChannelInitializer<Channel>() {
100             @Override
101             public void initChannel(Channel sch) throws Exception {
102                 sch.pipeline().addLast(
103                         new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
104                         new ObjectEncoder(),
105                         ch);
106             }
107         });
108 
109         Channel sc = sb.bind().sync().channel();
110         Channel cc = cb.connect(sc.localAddress()).sync().channel();
111         for (String element : data) {
112             cc.writeAndFlush(element);
113         }
114 
115         while (ch.counter < data.length) {
116             if (sh.exception.get() != null) {
117                 break;
118             }
119             if (ch.exception.get() != null) {
120                 break;
121             }
122 
123             try {
124                 Thread.sleep(50);
125             } catch (InterruptedException e) {
126                 // Ignore.
127             }
128         }
129 
130         while (sh.counter < data.length) {
131             if (sh.exception.get() != null) {
132                 break;
133             }
134             if (ch.exception.get() != null) {
135                 break;
136             }
137 
138             try {
139                 Thread.sleep(50);
140             } catch (InterruptedException e) {
141                 // Ignore.
142             }
143         }
144 
145         sh.channel.close().sync();
146         ch.channel.close().sync();
147         sc.close().sync();
148 
149         if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
150             throw sh.exception.get();
151         }
152         if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
153             throw ch.exception.get();
154         }
155         if (sh.exception.get() != null) {
156             throw sh.exception.get();
157         }
158         if (ch.exception.get() != null) {
159             throw ch.exception.get();
160         }
161     }
162 
163     private static class EchoHandler extends ChannelInboundHandlerAdapter {
164         private final boolean autoRead;
165         volatile Channel channel;
166         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
167         volatile int counter;
168 
169         EchoHandler(boolean autoRead) {
170             this.autoRead = autoRead;
171         }
172 
173         @Override
174         public void channelActive(ChannelHandlerContext ctx)
175                 throws Exception {
176             channel = ctx.channel();
177             if (!autoRead) {
178                 ctx.read();
179             }
180         }
181 
182         @Override
183         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
184             assertEquals(data[counter], msg);
185 
186             if (channel.parent() != null) {
187                 channel.write(msg);
188             }
189 
190             counter ++;
191         }
192 
193         @Override
194         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
195             try {
196                 ctx.flush();
197             } finally {
198                 if (!autoRead) {
199                     ctx.read();
200                 }
201             }
202         }
203 
204         @Override
205         public void exceptionCaught(ChannelHandlerContext ctx,
206                 Throwable cause) throws Exception {
207             if (exception.compareAndSet(null, cause)) {
208                 ctx.close();
209             }
210         }
211     }
212 }