View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.udt;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInitializer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.SimpleChannelInboundHandler;
25  import io.netty.channel.group.ChannelGroup;
26  import io.netty.channel.group.DefaultChannelGroup;
27  import io.netty.channel.nio.NioEventLoopGroup;
28  import io.netty.channel.udt.UdtChannel;
29  import io.netty.channel.udt.nio.NioUdtProvider;
30  import io.netty.handler.codec.DelimiterBasedFrameDecoder;
31  import io.netty.handler.codec.Delimiters;
32  import io.netty.handler.codec.string.StringDecoder;
33  import io.netty.handler.codec.string.StringEncoder;
34  import io.netty.util.CharsetUtil;
35  import io.netty.util.NetUtil;
36  import io.netty.util.concurrent.DefaultThreadFactory;
37  import io.netty.util.concurrent.GlobalEventExecutor;
38  import io.netty.util.internal.PlatformDependent;
39  import org.junit.jupiter.api.BeforeAll;
40  import org.junit.jupiter.api.Test;
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  import java.net.InetSocketAddress;
45  import java.util.concurrent.ThreadFactory;
46  
47  import static org.junit.jupiter.api.Assertions.assertEquals;
48  import static org.junit.jupiter.api.Assertions.assertTrue;
49  import static org.junit.jupiter.api.Assumptions.assumeFalse;
50  import static org.junit.jupiter.api.Assumptions.assumeTrue;
51  
52  /**
53   * Verify UDT connect/disconnect life cycle.
54   */
55  public class UDTClientServerConnectionTest {
56  
57      static class Client implements Runnable {
58  
59          static final Logger log = LoggerFactory.getLogger(Client.class);
60  
61          private final InetSocketAddress address;
62  
63          volatile Channel channel;
64          volatile boolean isRunning;
65          volatile boolean isShutdown;
66  
67          Client(InetSocketAddress address) {
68              this.address = address;
69          }
70  
71          @Override
72          public void run() {
73              final Bootstrap boot = new Bootstrap();
74              final ThreadFactory clientFactory = new DefaultThreadFactory("client");
75              final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
76                      clientFactory, NioUdtProvider.BYTE_PROVIDER);
77              try {
78                  boot.group(connectGroup)
79                          .channelFactory(NioUdtProvider.BYTE_CONNECTOR)
80                          .handler(new ChannelInitializer<UdtChannel>() {
81  
82                              @Override
83                              protected void initChannel(final UdtChannel ch)
84                                      throws Exception {
85                                  final ChannelPipeline pipeline = ch.pipeline();
86                                  pipeline.addLast("framer",
87                                          new DelimiterBasedFrameDecoder(8192,
88                                                  Delimiters.lineDelimiter()));
89                                  pipeline.addLast("decoder", new StringDecoder(
90                                          CharsetUtil.UTF_8));
91                                  pipeline.addLast("encoder", new StringEncoder(
92                                          CharsetUtil.UTF_8));
93                                  pipeline.addLast("handler", new ClientHandler());
94                              }
95                          });
96                  channel = boot.connect(address).sync().channel();
97                  isRunning = true;
98                  log.info("Client ready.");
99                  waitForRunning(false);
100                 log.info("Client closing...");
101                 channel.close().sync();
102                 isShutdown = true;
103                 log.info("Client is done.");
104             } catch (final Throwable e) {
105                 log.error("Client failed.", e);
106             } finally {
107                 connectGroup.shutdownGracefully().syncUninterruptibly();
108             }
109         }
110 
111         void shutdown() {
112             isRunning = false;
113         }
114 
115         void waitForActive(final boolean isActive) throws Exception {
116             for (int k = 0; k < WAIT_COUNT; k++) {
117                 Thread.sleep(WAIT_SLEEP);
118                 final ClientHandler handler = channel.pipeline().get(
119                         ClientHandler.class);
120                 if (handler != null && isActive == handler.isActive) {
121                     return;
122                 }
123             }
124         }
125 
126         void waitForRunning(final boolean isRunning) throws Exception {
127             for (int k = 0; k < WAIT_COUNT; k++) {
128                 if (isRunning == this.isRunning) {
129                     return;
130                 }
131                 Thread.sleep(WAIT_SLEEP);
132             }
133         }
134 
135         private void waitForShutdown() throws Exception {
136             for (int k = 0; k < WAIT_COUNT; k++) {
137                 if (isShutdown) {
138                     return;
139                 }
140                 Thread.sleep(WAIT_SLEEP);
141             }
142         }
143     }
144 
145     static class ClientHandler extends SimpleChannelInboundHandler<Object> {
146 
147         static final Logger log = LoggerFactory.getLogger(ClientHandler.class);
148 
149         volatile boolean isActive;
150 
151         @Override
152         public void channelActive(final ChannelHandlerContext ctx)
153                 throws Exception {
154             isActive = true;
155             log.info("Client active {}", ctx.channel());
156             super.channelActive(ctx);
157         }
158 
159         @Override
160         public void channelInactive(final ChannelHandlerContext ctx)
161                 throws Exception {
162             isActive = false;
163             log.info("Client inactive {}", ctx.channel());
164             super.channelInactive(ctx);
165         }
166 
167         @Override
168         public void exceptionCaught(final ChannelHandlerContext ctx,
169                 final Throwable cause) throws Exception {
170             log.warn("Client unexpected exception from downstream.", cause);
171             ctx.close();
172         }
173 
174         @Override
175         public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
176             log.info("Client received: " + msg);
177         }
178     }
179 
180     static class Server implements Runnable {
181 
182         static final Logger log = LoggerFactory.getLogger(Server.class);
183 
184         final ChannelGroup group = new DefaultChannelGroup("server group", GlobalEventExecutor.INSTANCE);
185 
186         private final InetSocketAddress address;
187 
188         volatile Channel channel;
189         volatile boolean isRunning;
190         volatile boolean isShutdown;
191 
192         Server(InetSocketAddress address) {
193             this.address = address;
194         }
195 
196         @Override
197         public void run() {
198             final ServerBootstrap boot = new ServerBootstrap();
199             final ThreadFactory factory = new DefaultThreadFactory("udp");
200             final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1,
201                     factory, NioUdtProvider.BYTE_PROVIDER);
202             try {
203                 boot.group(eventLoopGroup)
204                         .channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
205                         .childHandler(new ChannelInitializer<UdtChannel>() {
206                             @Override
207                             protected void initChannel(final UdtChannel ch)
208                                     throws Exception {
209                                 final ChannelPipeline pipeline = ch.pipeline();
210                                 pipeline.addLast("framer",
211                                         new DelimiterBasedFrameDecoder(8192,
212                                                 Delimiters.lineDelimiter()));
213                                 pipeline.addLast("decoder", new StringDecoder(
214                                         CharsetUtil.UTF_8));
215                                 pipeline.addLast("encoder", new StringEncoder(
216                                         CharsetUtil.UTF_8));
217                                 pipeline.addLast("handler", new ServerHandler(
218                                         group));
219                             }
220                         });
221                 channel = boot.bind(address).sync().channel();
222                 isRunning = true;
223                 log.info("Server ready.");
224                 waitForRunning(false);
225                 log.info("Server closing acceptor...");
226                 channel.close().sync();
227                 log.info("Server closing connectors...");
228                 group.close().sync();
229                 isShutdown = true;
230                 log.info("Server is done.");
231             } catch (final Throwable e) {
232                 log.error("Server failure.", e);
233             } finally {
234                 eventLoopGroup.shutdownGracefully();
235                 eventLoopGroup.terminationFuture().syncUninterruptibly();
236             }
237         }
238 
239         void shutdown() {
240             isRunning = false;
241         }
242 
243         void waitForActive(final boolean isActive) throws Exception {
244             for (int k = 0; k < WAIT_COUNT; k++) {
245                 Thread.sleep(WAIT_SLEEP);
246                 if (isActive) {
247                     for (final Channel channel : group) {
248                         final ServerHandler handler = channel.pipeline().get(
249                                 ServerHandler.class);
250                         if (handler != null && handler.isActive) {
251                             return;
252                         }
253                     }
254                 } else {
255                     if (group.isEmpty()) {
256                         return;
257                     }
258                 }
259             }
260         }
261 
262         void waitForRunning(final boolean isRunning) throws Exception {
263             for (int k = 0; k < WAIT_COUNT; k++) {
264                 if (isRunning == this.isRunning) {
265                     return;
266                 }
267                 Thread.sleep(WAIT_SLEEP);
268             }
269         }
270 
271         void waitForShutdown() throws Exception {
272             for (int k = 0; k < WAIT_COUNT; k++) {
273                 if (isShutdown) {
274                     return;
275                 }
276                 Thread.sleep(WAIT_SLEEP);
277             }
278         }
279     }
280 
281     static class ServerHandler extends
282             SimpleChannelInboundHandler<Object> {
283 
284         static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
285 
286         final ChannelGroup group;
287 
288         volatile boolean isActive;
289 
290         ServerHandler(final ChannelGroup group) {
291             this.group = group;
292         }
293 
294         @Override
295         public void channelActive(final ChannelHandlerContext ctx)
296                 throws Exception {
297             group.add(ctx.channel());
298             isActive = true;
299             log.info("Server active  : {}", ctx.channel());
300             super.channelActive(ctx);
301         }
302 
303         @Override
304         public void channelInactive(final ChannelHandlerContext ctx)
305                 throws Exception {
306             group.remove(ctx.channel());
307             isActive = false;
308             log.info("Server inactive: {}", ctx.channel());
309             super.channelInactive(ctx);
310         }
311 
312         @Override
313         public void exceptionCaught(final ChannelHandlerContext ctx,
314                 final Throwable cause) {
315             log.warn("Server close on exception.", cause);
316             ctx.close();
317         }
318 
319         @Override
320         public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
321             log.info("Server received: " + msg);
322         }
323     }
324     static final Logger log = LoggerFactory
325             .getLogger(UDTClientServerConnectionTest.class);
326 
327     /**
328      * Maximum wait time is 5 seconds.
329      * <p>
330      * wait-time = {@code WAIT_COUNT} * {@value #WAIT_SLEEP}
331      */
332     static final int WAIT_COUNT = 50;
333     static final int WAIT_SLEEP = 100;
334 
335     @BeforeAll
336     public static void assumeUdt() {
337         assumeTrue(canLoadAndInit(), "com.barchart.udt.SocketUDT can not be loaded and initialized");
338         assumeFalse(PlatformDependent.isJ9Jvm(), "Not supported on J9 JVM");
339     }
340 
341     private static boolean canLoadAndInit() {
342         try {
343             Class.forName("com.barchart.udt.SocketUDT", true,
344                     UDTClientServerConnectionTest.class.getClassLoader());
345             return true;
346         } catch (Throwable e) {
347             return false;
348         }
349     }
350 
351     /**
352      * Verify UDT client/server connect and disconnect.
353      */
354     @Test
355     public void connection() throws Exception {
356         log.info("Starting server.");
357         // Using LOCALHOST4 as UDT transport does not support IPV6 :(
358         final Server server = new Server(new InetSocketAddress(NetUtil.LOCALHOST4, 0));
359         final Thread serverTread = new Thread(server, "server-*");
360         serverTread.start();
361         server.waitForRunning(true);
362         assertTrue(server.isRunning);
363 
364         log.info("Starting client.");
365         final Client client = new Client((InetSocketAddress) server.channel.localAddress());
366         final Thread clientThread = new Thread(client, "client-*");
367         clientThread.start();
368         client.waitForRunning(true);
369         assertTrue(client.isRunning);
370 
371         log.info("Wait till connection is active.");
372         client.waitForActive(true);
373         server.waitForActive(true);
374 
375         log.info("Verify connection is active.");
376         assertEquals(1, server.group.size(), "group must have one");
377 
378         log.info("Stopping client.");
379         client.shutdown();
380         client.waitForShutdown();
381         assertTrue(client.isShutdown);
382 
383         log.info("Wait till connection is inactive.");
384         client.waitForActive(false);
385         server.waitForActive(false);
386 
387         log.info("Verify connection is inactive.");
388         assertEquals(0, server.group.size(), "group must be empty");
389 
390         log.info("Stopping server.");
391         server.shutdown();
392         server.waitForShutdown();
393         assertTrue(server.isShutdown);
394 
395         log.info("Finished server.");
396     }
397 
398 }