View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.udt;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInitializer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.EventLoopGroup;
25  import io.netty.channel.MultiThreadIoEventLoopGroup;
26  import io.netty.channel.SimpleChannelInboundHandler;
27  import io.netty.channel.group.ChannelGroup;
28  import io.netty.channel.group.DefaultChannelGroup;
29  import io.netty.channel.nio.NioIoHandler;
30  import io.netty.channel.udt.UdtChannel;
31  import io.netty.channel.udt.nio.NioUdtProvider;
32  import io.netty.handler.codec.DelimiterBasedFrameDecoder;
33  import io.netty.handler.codec.Delimiters;
34  import io.netty.handler.codec.string.StringDecoder;
35  import io.netty.handler.codec.string.StringEncoder;
36  import io.netty.util.CharsetUtil;
37  import io.netty.util.NetUtil;
38  import io.netty.util.concurrent.DefaultThreadFactory;
39  import io.netty.util.concurrent.GlobalEventExecutor;
40  import io.netty.util.internal.PlatformDependent;
41  import org.junit.jupiter.api.BeforeAll;
42  import org.junit.jupiter.api.Test;
43  import org.slf4j.Logger;
44  import org.slf4j.LoggerFactory;
45  
46  import java.net.InetSocketAddress;
47  import java.util.concurrent.ThreadFactory;
48  
49  import static org.junit.jupiter.api.Assertions.assertEquals;
50  import static org.junit.jupiter.api.Assertions.assertTrue;
51  import static org.junit.jupiter.api.Assumptions.assumeFalse;
52  import static org.junit.jupiter.api.Assumptions.assumeTrue;
53  
54  /**
55   * Verify UDT connect/disconnect life cycle.
56   */
57  public class UDTClientServerConnectionTest {
58  
59      static class Client implements Runnable {
60  
61          static final Logger log = LoggerFactory.getLogger(Client.class);
62  
63          private final InetSocketAddress address;
64  
65          volatile Channel channel;
66          volatile boolean isRunning;
67          volatile boolean isShutdown;
68  
69          Client(InetSocketAddress address) {
70              this.address = address;
71          }
72  
73          @Override
74          public void run() {
75              final Bootstrap boot = new Bootstrap();
76              final ThreadFactory clientFactory = new DefaultThreadFactory("client");
77              final EventLoopGroup connectGroup = new MultiThreadIoEventLoopGroup(1,
78                      clientFactory, NioIoHandler.newFactory(NioUdtProvider.BYTE_PROVIDER));
79              try {
80                  boot.group(connectGroup)
81                          .channelFactory(NioUdtProvider.BYTE_CONNECTOR)
82                          .handler(new ChannelInitializer<UdtChannel>() {
83  
84                              @Override
85                              protected void initChannel(final UdtChannel ch)
86                                      throws Exception {
87                                  final ChannelPipeline pipeline = ch.pipeline();
88                                  pipeline.addLast("framer",
89                                          new DelimiterBasedFrameDecoder(8192,
90                                                  Delimiters.lineDelimiter()));
91                                  pipeline.addLast("decoder", new StringDecoder(
92                                          CharsetUtil.UTF_8));
93                                  pipeline.addLast("encoder", new StringEncoder(
94                                          CharsetUtil.UTF_8));
95                                  pipeline.addLast("handler", new ClientHandler());
96                              }
97                          });
98                  channel = boot.connect(address).sync().channel();
99                  isRunning = true;
100                 log.info("Client ready.");
101                 waitForRunning(false);
102                 log.info("Client closing...");
103                 channel.close().sync();
104                 isShutdown = true;
105                 log.info("Client is done.");
106             } catch (final Throwable e) {
107                 log.error("Client failed.", e);
108             } finally {
109                 connectGroup.shutdownGracefully().syncUninterruptibly();
110             }
111         }
112 
113         void shutdown() {
114             isRunning = false;
115         }
116 
117         void waitForActive(final boolean isActive) throws Exception {
118             for (int k = 0; k < WAIT_COUNT; k++) {
119                 Thread.sleep(WAIT_SLEEP);
120                 final ClientHandler handler = channel.pipeline().get(
121                         ClientHandler.class);
122                 if (handler != null && isActive == handler.isActive) {
123                     return;
124                 }
125             }
126         }
127 
128         void waitForRunning(final boolean isRunning) throws Exception {
129             for (int k = 0; k < WAIT_COUNT; k++) {
130                 if (isRunning == this.isRunning) {
131                     return;
132                 }
133                 Thread.sleep(WAIT_SLEEP);
134             }
135         }
136 
137         private void waitForShutdown() throws Exception {
138             for (int k = 0; k < WAIT_COUNT; k++) {
139                 if (isShutdown) {
140                     return;
141                 }
142                 Thread.sleep(WAIT_SLEEP);
143             }
144         }
145     }
146 
147     static class ClientHandler extends SimpleChannelInboundHandler<Object> {
148 
149         static final Logger log = LoggerFactory.getLogger(ClientHandler.class);
150 
151         volatile boolean isActive;
152 
153         @Override
154         public void channelActive(final ChannelHandlerContext ctx)
155                 throws Exception {
156             isActive = true;
157             log.info("Client active {}", ctx.channel());
158             super.channelActive(ctx);
159         }
160 
161         @Override
162         public void channelInactive(final ChannelHandlerContext ctx)
163                 throws Exception {
164             isActive = false;
165             log.info("Client inactive {}", ctx.channel());
166             super.channelInactive(ctx);
167         }
168 
169         @Override
170         public void exceptionCaught(final ChannelHandlerContext ctx,
171                 final Throwable cause) throws Exception {
172             log.warn("Client unexpected exception from downstream.", cause);
173             ctx.close();
174         }
175 
176         @Override
177         public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
178             log.info("Client received: " + msg);
179         }
180     }
181 
182     static class Server implements Runnable {
183 
184         static final Logger log = LoggerFactory.getLogger(Server.class);
185 
186         final ChannelGroup group = new DefaultChannelGroup("server group", GlobalEventExecutor.INSTANCE);
187 
188         private final InetSocketAddress address;
189 
190         volatile Channel channel;
191         volatile boolean isRunning;
192         volatile boolean isShutdown;
193 
194         Server(InetSocketAddress address) {
195             this.address = address;
196         }
197 
198         @Override
199         public void run() {
200             final ServerBootstrap boot = new ServerBootstrap();
201             final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
202             final ThreadFactory serverFactory = new DefaultThreadFactory("server");
203             final EventLoopGroup acceptGroup = new MultiThreadIoEventLoopGroup(1,
204                     acceptFactory, NioIoHandler.newFactory(NioUdtProvider.BYTE_PROVIDER));
205             final EventLoopGroup connectGroup = new MultiThreadIoEventLoopGroup(1,
206                     serverFactory, NioIoHandler.newFactory(NioUdtProvider.BYTE_PROVIDER));
207             try {
208                 boot.group(acceptGroup, connectGroup)
209                         .channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
210                         .childHandler(new ChannelInitializer<UdtChannel>() {
211                             @Override
212                             protected void initChannel(final UdtChannel ch)
213                                     throws Exception {
214                                 final ChannelPipeline pipeline = ch.pipeline();
215                                 pipeline.addLast("framer",
216                                         new DelimiterBasedFrameDecoder(8192,
217                                                 Delimiters.lineDelimiter()));
218                                 pipeline.addLast("decoder", new StringDecoder(
219                                         CharsetUtil.UTF_8));
220                                 pipeline.addLast("encoder", new StringEncoder(
221                                         CharsetUtil.UTF_8));
222                                 pipeline.addLast("handler", new ServerHandler(
223                                         group));
224                             }
225                         });
226                 channel = boot.bind(address).sync().channel();
227                 isRunning = true;
228                 log.info("Server ready.");
229                 waitForRunning(false);
230                 log.info("Server closing acceptor...");
231                 channel.close().sync();
232                 log.info("Server closing connectors...");
233                 group.close().sync();
234                 isShutdown = true;
235                 log.info("Server is done.");
236             } catch (final Throwable e) {
237                 log.error("Server failure.", e);
238             } finally {
239                 acceptGroup.shutdownGracefully();
240                 connectGroup.shutdownGracefully();
241 
242                 acceptGroup.terminationFuture().syncUninterruptibly();
243                 connectGroup.terminationFuture().syncUninterruptibly();
244             }
245         }
246 
247         void shutdown() {
248             isRunning = false;
249         }
250 
251         void waitForActive(final boolean isActive) throws Exception {
252             for (int k = 0; k < WAIT_COUNT; k++) {
253                 Thread.sleep(WAIT_SLEEP);
254                 if (isActive) {
255                     for (final Channel channel : group) {
256                         final ServerHandler handler = channel.pipeline().get(
257                                 ServerHandler.class);
258                         if (handler != null && handler.isActive) {
259                             return;
260                         }
261                     }
262                 } else {
263                     if (group.isEmpty()) {
264                         return;
265                     }
266                 }
267             }
268         }
269 
270         void waitForRunning(final boolean isRunning) throws Exception {
271             for (int k = 0; k < WAIT_COUNT; k++) {
272                 if (isRunning == this.isRunning) {
273                     return;
274                 }
275                 Thread.sleep(WAIT_SLEEP);
276             }
277         }
278 
279         void waitForShutdown() throws Exception {
280             for (int k = 0; k < WAIT_COUNT; k++) {
281                 if (isShutdown) {
282                     return;
283                 }
284                 Thread.sleep(WAIT_SLEEP);
285             }
286         }
287     }
288 
289     static class ServerHandler extends
290             SimpleChannelInboundHandler<Object> {
291 
292         static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
293 
294         final ChannelGroup group;
295 
296         volatile boolean isActive;
297 
298         ServerHandler(final ChannelGroup group) {
299             this.group = group;
300         }
301 
302         @Override
303         public void channelActive(final ChannelHandlerContext ctx)
304                 throws Exception {
305             group.add(ctx.channel());
306             isActive = true;
307             log.info("Server active  : {}", ctx.channel());
308             super.channelActive(ctx);
309         }
310 
311         @Override
312         public void channelInactive(final ChannelHandlerContext ctx)
313                 throws Exception {
314             group.remove(ctx.channel());
315             isActive = false;
316             log.info("Server inactive: {}", ctx.channel());
317             super.channelInactive(ctx);
318         }
319 
320         @Override
321         public void exceptionCaught(final ChannelHandlerContext ctx,
322                 final Throwable cause) {
323             log.warn("Server close on exception.", cause);
324             ctx.close();
325         }
326 
327         @Override
328         public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
329             log.info("Server received: " + msg);
330         }
331     }
332     static final Logger log = LoggerFactory
333             .getLogger(UDTClientServerConnectionTest.class);
334 
335     /**
336      * Maximum wait time is 5 seconds.
337      * <p>
338      * wait-time = {@code WAIT_COUNT} * {@value #WAIT_SLEEP}
339      */
340     static final int WAIT_COUNT = 50;
341     static final int WAIT_SLEEP = 100;
342 
343     @BeforeAll
344     public static void assumeUdt() {
345         assumeTrue(canLoadAndInit(), "com.barchart.udt.SocketUDT can not be loaded and initialized");
346         assumeFalse(PlatformDependent.isJ9Jvm(), "Not supported on J9 JVM");
347     }
348 
349     private static boolean canLoadAndInit() {
350         try {
351             Class.forName("com.barchart.udt.SocketUDT", true,
352                     UDTClientServerConnectionTest.class.getClassLoader());
353             return true;
354         } catch (Throwable e) {
355             return false;
356         }
357     }
358 
359     /**
360      * Verify UDT client/server connect and disconnect.
361      */
362     @Test
363     public void connection() throws Exception {
364         log.info("Starting server.");
365         // Using LOCALHOST4 as UDT transport does not support IPV6 :(
366         final Server server = new Server(new InetSocketAddress(NetUtil.LOCALHOST4, 0));
367         final Thread serverTread = new Thread(server, "server-*");
368         serverTread.start();
369         server.waitForRunning(true);
370         assertTrue(server.isRunning);
371 
372         log.info("Starting client.");
373         final Client client = new Client((InetSocketAddress) server.channel.localAddress());
374         final Thread clientThread = new Thread(client, "client-*");
375         clientThread.start();
376         client.waitForRunning(true);
377         assertTrue(client.isRunning);
378 
379         log.info("Wait till connection is active.");
380         client.waitForActive(true);
381         server.waitForActive(true);
382 
383         log.info("Verify connection is active.");
384         assertEquals(1, server.group.size(), "group must have one");
385 
386         log.info("Stopping client.");
387         client.shutdown();
388         client.waitForShutdown();
389         assertTrue(client.isShutdown);
390 
391         log.info("Wait till connection is inactive.");
392         client.waitForActive(false);
393         server.waitForActive(false);
394 
395         log.info("Verify connection is inactive.");
396         assertEquals(0, server.group.size(), "group must be empty");
397 
398         log.info("Stopping server.");
399         server.shutdown();
400         server.waitForShutdown();
401         assertTrue(server.isShutdown);
402 
403         log.info("Finished server.");
404     }
405 
406 }