View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.udt;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInitializer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.EventLoopGroup;
25  import io.netty.channel.MultiThreadIoEventLoopGroup;
26  import io.netty.channel.SimpleChannelInboundHandler;
27  import io.netty.channel.group.ChannelGroup;
28  import io.netty.channel.group.DefaultChannelGroup;
29  import io.netty.channel.nio.NioIoHandler;
30  import io.netty.channel.udt.UdtChannel;
31  import io.netty.channel.udt.nio.NioUdtProvider;
32  import io.netty.handler.codec.DelimiterBasedFrameDecoder;
33  import io.netty.handler.codec.Delimiters;
34  import io.netty.handler.codec.string.StringDecoder;
35  import io.netty.handler.codec.string.StringEncoder;
36  import io.netty.util.CharsetUtil;
37  import io.netty.util.NetUtil;
38  import io.netty.util.concurrent.DefaultThreadFactory;
39  import io.netty.util.concurrent.GlobalEventExecutor;
40  import io.netty.util.internal.PlatformDependent;
41  import org.junit.jupiter.api.BeforeAll;
42  import org.junit.jupiter.api.Test;
43  import org.slf4j.Logger;
44  import org.slf4j.LoggerFactory;
45  
46  import java.net.InetSocketAddress;
47  import java.util.concurrent.ThreadFactory;
48  
49  import static org.junit.jupiter.api.Assertions.assertEquals;
50  import static org.junit.jupiter.api.Assertions.assertTrue;
51  import static org.junit.jupiter.api.Assumptions.assumeFalse;
52  import static org.junit.jupiter.api.Assumptions.assumeTrue;
53  
54  /**
55   * Verify UDT connect/disconnect life cycle.
56   */
57  public class UDTClientServerConnectionTest {
58  
59      static class Client implements Runnable {
60  
61          static final Logger log = LoggerFactory.getLogger(Client.class);
62  
63          private final InetSocketAddress address;
64  
65          volatile Channel channel;
66          volatile boolean isRunning;
67          volatile boolean isShutdown;
68  
69          Client(InetSocketAddress address) {
70              this.address = address;
71          }
72  
73          @Override
74          public void run() {
75              final Bootstrap boot = new Bootstrap();
76              final ThreadFactory clientFactory = new DefaultThreadFactory("client");
77              final EventLoopGroup connectGroup = new MultiThreadIoEventLoopGroup(1,
78                      clientFactory, NioIoHandler.newFactory(NioUdtProvider.BYTE_PROVIDER));
79              try {
80                  boot.group(connectGroup)
81                          .channelFactory(NioUdtProvider.BYTE_CONNECTOR)
82                          .handler(new ChannelInitializer<UdtChannel>() {
83  
84                              @Override
85                              protected void initChannel(final UdtChannel ch)
86                                      throws Exception {
87                                  final ChannelPipeline pipeline = ch.pipeline();
88                                  pipeline.addLast("framer",
89                                          new DelimiterBasedFrameDecoder(8192,
90                                                  Delimiters.lineDelimiter()));
91                                  pipeline.addLast("decoder", new StringDecoder(
92                                          CharsetUtil.UTF_8));
93                                  pipeline.addLast("encoder", new StringEncoder(
94                                          CharsetUtil.UTF_8));
95                                  pipeline.addLast("handler", new ClientHandler());
96                              }
97                          });
98                  channel = boot.connect(address).sync().channel();
99                  isRunning = true;
100                 log.info("Client ready.");
101                 waitForRunning(false);
102                 log.info("Client closing...");
103                 channel.close().sync();
104                 isShutdown = true;
105                 log.info("Client is done.");
106             } catch (final Throwable e) {
107                 log.error("Client failed.", e);
108             } finally {
109                 connectGroup.shutdownGracefully().syncUninterruptibly();
110             }
111         }
112 
113         void shutdown() {
114             isRunning = false;
115         }
116 
117         void waitForActive(final boolean isActive) throws Exception {
118             for (int k = 0; k < WAIT_COUNT; k++) {
119                 Thread.sleep(WAIT_SLEEP);
120                 final ClientHandler handler = channel.pipeline().get(
121                         ClientHandler.class);
122                 if (handler != null && isActive == handler.isActive) {
123                     return;
124                 }
125             }
126         }
127 
128         void waitForRunning(final boolean isRunning) throws Exception {
129             for (int k = 0; k < WAIT_COUNT; k++) {
130                 if (isRunning == this.isRunning) {
131                     return;
132                 }
133                 Thread.sleep(WAIT_SLEEP);
134             }
135         }
136 
137         private void waitForShutdown() throws Exception {
138             for (int k = 0; k < WAIT_COUNT; k++) {
139                 if (isShutdown) {
140                     return;
141                 }
142                 Thread.sleep(WAIT_SLEEP);
143             }
144         }
145     }
146 
147     static class ClientHandler extends SimpleChannelInboundHandler<Object> {
148 
149         static final Logger log = LoggerFactory.getLogger(ClientHandler.class);
150 
151         volatile boolean isActive;
152 
153         @Override
154         public void channelActive(final ChannelHandlerContext ctx)
155                 throws Exception {
156             isActive = true;
157             log.info("Client active {}", ctx.channel());
158             super.channelActive(ctx);
159         }
160 
161         @Override
162         public void channelInactive(final ChannelHandlerContext ctx)
163                 throws Exception {
164             isActive = false;
165             log.info("Client inactive {}", ctx.channel());
166             super.channelInactive(ctx);
167         }
168 
169         @Override
170         public void exceptionCaught(final ChannelHandlerContext ctx,
171                 final Throwable cause) throws Exception {
172             log.warn("Client unexpected exception from downstream.", cause);
173             ctx.close();
174         }
175 
176         @Override
177         public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
178             log.info("Client received: " + msg);
179         }
180     }
181 
182     static class Server implements Runnable {
183 
184         static final Logger log = LoggerFactory.getLogger(Server.class);
185 
186         final ChannelGroup group = new DefaultChannelGroup("server group", GlobalEventExecutor.INSTANCE);
187 
188         private final InetSocketAddress address;
189 
190         volatile Channel channel;
191         volatile boolean isRunning;
192         volatile boolean isShutdown;
193 
194         Server(InetSocketAddress address) {
195             this.address = address;
196         }
197 
198         @Override
199         public void run() {
200             final ServerBootstrap boot = new ServerBootstrap();
201             final ThreadFactory factory = new DefaultThreadFactory("udp");
202             final EventLoopGroup eventLoopGroup = new MultiThreadIoEventLoopGroup(1,
203                     factory, NioIoHandler.newFactory(NioUdtProvider.BYTE_PROVIDER));
204             try {
205                 boot.group(eventLoopGroup)
206                         .channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
207                         .childHandler(new ChannelInitializer<UdtChannel>() {
208                             @Override
209                             protected void initChannel(final UdtChannel ch)
210                                     throws Exception {
211                                 final ChannelPipeline pipeline = ch.pipeline();
212                                 pipeline.addLast("framer",
213                                         new DelimiterBasedFrameDecoder(8192,
214                                                 Delimiters.lineDelimiter()));
215                                 pipeline.addLast("decoder", new StringDecoder(
216                                         CharsetUtil.UTF_8));
217                                 pipeline.addLast("encoder", new StringEncoder(
218                                         CharsetUtil.UTF_8));
219                                 pipeline.addLast("handler", new ServerHandler(
220                                         group));
221                             }
222                         });
223                 channel = boot.bind(address).sync().channel();
224                 isRunning = true;
225                 log.info("Server ready.");
226                 waitForRunning(false);
227                 log.info("Server closing acceptor...");
228                 channel.close().sync();
229                 log.info("Server closing connectors...");
230                 group.close().sync();
231                 isShutdown = true;
232                 log.info("Server is done.");
233             } catch (final Throwable e) {
234                 log.error("Server failure.", e);
235             } finally {
236                 eventLoopGroup.shutdownGracefully();
237                 eventLoopGroup.terminationFuture().syncUninterruptibly();
238             }
239         }
240 
241         void shutdown() {
242             isRunning = false;
243         }
244 
245         void waitForActive(final boolean isActive) throws Exception {
246             for (int k = 0; k < WAIT_COUNT; k++) {
247                 Thread.sleep(WAIT_SLEEP);
248                 if (isActive) {
249                     for (final Channel channel : group) {
250                         final ServerHandler handler = channel.pipeline().get(
251                                 ServerHandler.class);
252                         if (handler != null && handler.isActive) {
253                             return;
254                         }
255                     }
256                 } else {
257                     if (group.isEmpty()) {
258                         return;
259                     }
260                 }
261             }
262         }
263 
264         void waitForRunning(final boolean isRunning) throws Exception {
265             for (int k = 0; k < WAIT_COUNT; k++) {
266                 if (isRunning == this.isRunning) {
267                     return;
268                 }
269                 Thread.sleep(WAIT_SLEEP);
270             }
271         }
272 
273         void waitForShutdown() throws Exception {
274             for (int k = 0; k < WAIT_COUNT; k++) {
275                 if (isShutdown) {
276                     return;
277                 }
278                 Thread.sleep(WAIT_SLEEP);
279             }
280         }
281     }
282 
283     static class ServerHandler extends
284             SimpleChannelInboundHandler<Object> {
285 
286         static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
287 
288         final ChannelGroup group;
289 
290         volatile boolean isActive;
291 
292         ServerHandler(final ChannelGroup group) {
293             this.group = group;
294         }
295 
296         @Override
297         public void channelActive(final ChannelHandlerContext ctx)
298                 throws Exception {
299             group.add(ctx.channel());
300             isActive = true;
301             log.info("Server active  : {}", ctx.channel());
302             super.channelActive(ctx);
303         }
304 
305         @Override
306         public void channelInactive(final ChannelHandlerContext ctx)
307                 throws Exception {
308             group.remove(ctx.channel());
309             isActive = false;
310             log.info("Server inactive: {}", ctx.channel());
311             super.channelInactive(ctx);
312         }
313 
314         @Override
315         public void exceptionCaught(final ChannelHandlerContext ctx,
316                 final Throwable cause) {
317             log.warn("Server close on exception.", cause);
318             ctx.close();
319         }
320 
321         @Override
322         public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
323             log.info("Server received: " + msg);
324         }
325     }
326     static final Logger log = LoggerFactory
327             .getLogger(UDTClientServerConnectionTest.class);
328 
329     /**
330      * Maximum wait time is 5 seconds.
331      * <p>
332      * wait-time = {@code WAIT_COUNT} * {@value #WAIT_SLEEP}
333      */
334     static final int WAIT_COUNT = 50;
335     static final int WAIT_SLEEP = 100;
336 
337     @BeforeAll
338     public static void assumeUdt() {
339         assumeTrue(canLoadAndInit(), "com.barchart.udt.SocketUDT can not be loaded and initialized");
340         assumeFalse(PlatformDependent.isJ9Jvm(), "Not supported on J9 JVM");
341     }
342 
343     private static boolean canLoadAndInit() {
344         try {
345             Class.forName("com.barchart.udt.SocketUDT", true,
346                     UDTClientServerConnectionTest.class.getClassLoader());
347             return true;
348         } catch (Throwable e) {
349             return false;
350         }
351     }
352 
353     /**
354      * Verify UDT client/server connect and disconnect.
355      */
356     @Test
357     public void connection() throws Exception {
358         log.info("Starting server.");
359         // Using LOCALHOST4 as UDT transport does not support IPV6 :(
360         final Server server = new Server(new InetSocketAddress(NetUtil.LOCALHOST4, 0));
361         final Thread serverTread = new Thread(server, "server-*");
362         serverTread.start();
363         server.waitForRunning(true);
364         assertTrue(server.isRunning);
365 
366         log.info("Starting client.");
367         final Client client = new Client((InetSocketAddress) server.channel.localAddress());
368         final Thread clientThread = new Thread(client, "client-*");
369         clientThread.start();
370         client.waitForRunning(true);
371         assertTrue(client.isRunning);
372 
373         log.info("Wait till connection is active.");
374         client.waitForActive(true);
375         server.waitForActive(true);
376 
377         log.info("Verify connection is active.");
378         assertEquals(1, server.group.size(), "group must have one");
379 
380         log.info("Stopping client.");
381         client.shutdown();
382         client.waitForShutdown();
383         assertTrue(client.isShutdown);
384 
385         log.info("Wait till connection is inactive.");
386         client.waitForActive(false);
387         server.waitForActive(false);
388 
389         log.info("Verify connection is inactive.");
390         assertEquals(0, server.group.size(), "group must be empty");
391 
392         log.info("Stopping server.");
393         server.shutdown();
394         server.waitForShutdown();
395         assertTrue(server.isShutdown);
396 
397         log.info("Finished server.");
398     }
399 
400 }