View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.udt;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInitializer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.SimpleChannelInboundHandler;
25  import io.netty.channel.group.ChannelGroup;
26  import io.netty.channel.group.DefaultChannelGroup;
27  import io.netty.channel.nio.NioEventLoopGroup;
28  import io.netty.channel.udt.UdtChannel;
29  import io.netty.channel.udt.nio.NioUdtProvider;
30  import io.netty.handler.codec.DelimiterBasedFrameDecoder;
31  import io.netty.handler.codec.Delimiters;
32  import io.netty.handler.codec.string.StringDecoder;
33  import io.netty.handler.codec.string.StringEncoder;
34  import io.netty.util.CharsetUtil;
35  import io.netty.util.NetUtil;
36  import io.netty.util.concurrent.DefaultThreadFactory;
37  import io.netty.util.concurrent.GlobalEventExecutor;
38  import io.netty.util.internal.PlatformDependent;
39  import org.junit.jupiter.api.BeforeAll;
40  import org.junit.jupiter.api.Test;
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  import java.net.InetSocketAddress;
45  import java.util.concurrent.ThreadFactory;
46  
47  import static org.junit.jupiter.api.Assertions.assertEquals;
48  import static org.junit.jupiter.api.Assertions.assertTrue;
49  import static org.junit.jupiter.api.Assumptions.assumeFalse;
50  import static org.junit.jupiter.api.Assumptions.assumeTrue;
51  
52  /**
53   * Verify UDT connect/disconnect life cycle.
54   */
55  public class UDTClientServerConnectionTest {
56  
57      static class Client implements Runnable {
58  
59          static final Logger log = LoggerFactory.getLogger(Client.class);
60  
61          private final InetSocketAddress address;
62  
63          volatile Channel channel;
64          volatile boolean isRunning;
65          volatile boolean isShutdown;
66  
67          Client(InetSocketAddress address) {
68              this.address = address;
69          }
70  
71          @Override
72          public void run() {
73              final Bootstrap boot = new Bootstrap();
74              final ThreadFactory clientFactory = new DefaultThreadFactory("client");
75              final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
76                      clientFactory, NioUdtProvider.BYTE_PROVIDER);
77              try {
78                  boot.group(connectGroup)
79                          .channelFactory(NioUdtProvider.BYTE_CONNECTOR)
80                          .handler(new ChannelInitializer<UdtChannel>() {
81  
82                              @Override
83                              protected void initChannel(final UdtChannel ch)
84                                      throws Exception {
85                                  final ChannelPipeline pipeline = ch.pipeline();
86                                  pipeline.addLast("framer",
87                                          new DelimiterBasedFrameDecoder(8192,
88                                                  Delimiters.lineDelimiter()));
89                                  pipeline.addLast("decoder", new StringDecoder(
90                                          CharsetUtil.UTF_8));
91                                  pipeline.addLast("encoder", new StringEncoder(
92                                          CharsetUtil.UTF_8));
93                                  pipeline.addLast("handler", new ClientHandler());
94                              }
95                          });
96                  channel = boot.connect(address).sync().channel();
97                  isRunning = true;
98                  log.info("Client ready.");
99                  waitForRunning(false);
100                 log.info("Client closing...");
101                 channel.close().sync();
102                 isShutdown = true;
103                 log.info("Client is done.");
104             } catch (final Throwable e) {
105                 log.error("Client failed.", e);
106             } finally {
107                 connectGroup.shutdownGracefully().syncUninterruptibly();
108             }
109         }
110 
111         void shutdown() {
112             isRunning = false;
113         }
114 
115         void waitForActive(final boolean isActive) throws Exception {
116             for (int k = 0; k < WAIT_COUNT; k++) {
117                 Thread.sleep(WAIT_SLEEP);
118                 final ClientHandler handler = channel.pipeline().get(
119                         ClientHandler.class);
120                 if (handler != null && isActive == handler.isActive) {
121                     return;
122                 }
123             }
124         }
125 
126         void waitForRunning(final boolean isRunning) throws Exception {
127             for (int k = 0; k < WAIT_COUNT; k++) {
128                 if (isRunning == this.isRunning) {
129                     return;
130                 }
131                 Thread.sleep(WAIT_SLEEP);
132             }
133         }
134 
135         private void waitForShutdown() throws Exception {
136             for (int k = 0; k < WAIT_COUNT; k++) {
137                 if (isShutdown) {
138                     return;
139                 }
140                 Thread.sleep(WAIT_SLEEP);
141             }
142         }
143     }
144 
145     static class ClientHandler extends SimpleChannelInboundHandler<Object> {
146 
147         static final Logger log = LoggerFactory.getLogger(ClientHandler.class);
148 
149         volatile boolean isActive;
150 
151         @Override
152         public void channelActive(final ChannelHandlerContext ctx)
153                 throws Exception {
154             isActive = true;
155             log.info("Client active {}", ctx.channel());
156             super.channelActive(ctx);
157         }
158 
159         @Override
160         public void channelInactive(final ChannelHandlerContext ctx)
161                 throws Exception {
162             isActive = false;
163             log.info("Client inactive {}", ctx.channel());
164             super.channelInactive(ctx);
165         }
166 
167         @Override
168         public void exceptionCaught(final ChannelHandlerContext ctx,
169                 final Throwable cause) throws Exception {
170             log.warn("Client unexpected exception from downstream.", cause);
171             ctx.close();
172         }
173 
174         @Override
175         public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
176             log.info("Client received: " + msg);
177         }
178     }
179 
180     static class Server implements Runnable {
181 
182         static final Logger log = LoggerFactory.getLogger(Server.class);
183 
184         final ChannelGroup group = new DefaultChannelGroup("server group", GlobalEventExecutor.INSTANCE);
185 
186         private final InetSocketAddress address;
187 
188         volatile Channel channel;
189         volatile boolean isRunning;
190         volatile boolean isShutdown;
191 
192         Server(InetSocketAddress address) {
193             this.address = address;
194         }
195 
196         @Override
197         public void run() {
198             final ServerBootstrap boot = new ServerBootstrap();
199             final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
200             final ThreadFactory serverFactory = new DefaultThreadFactory("server");
201             final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1,
202                     acceptFactory, NioUdtProvider.BYTE_PROVIDER);
203             final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
204                     serverFactory, NioUdtProvider.BYTE_PROVIDER);
205             try {
206                 boot.group(acceptGroup, connectGroup)
207                         .channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
208                         .childHandler(new ChannelInitializer<UdtChannel>() {
209                             @Override
210                             protected void initChannel(final UdtChannel ch)
211                                     throws Exception {
212                                 final ChannelPipeline pipeline = ch.pipeline();
213                                 pipeline.addLast("framer",
214                                         new DelimiterBasedFrameDecoder(8192,
215                                                 Delimiters.lineDelimiter()));
216                                 pipeline.addLast("decoder", new StringDecoder(
217                                         CharsetUtil.UTF_8));
218                                 pipeline.addLast("encoder", new StringEncoder(
219                                         CharsetUtil.UTF_8));
220                                 pipeline.addLast("handler", new ServerHandler(
221                                         group));
222                             }
223                         });
224                 channel = boot.bind(address).sync().channel();
225                 isRunning = true;
226                 log.info("Server ready.");
227                 waitForRunning(false);
228                 log.info("Server closing acceptor...");
229                 channel.close().sync();
230                 log.info("Server closing connectors...");
231                 group.close().sync();
232                 isShutdown = true;
233                 log.info("Server is done.");
234             } catch (final Throwable e) {
235                 log.error("Server failure.", e);
236             } finally {
237                 acceptGroup.shutdownGracefully();
238                 connectGroup.shutdownGracefully();
239 
240                 acceptGroup.terminationFuture().syncUninterruptibly();
241                 connectGroup.terminationFuture().syncUninterruptibly();
242             }
243         }
244 
245         void shutdown() {
246             isRunning = false;
247         }
248 
249         void waitForActive(final boolean isActive) throws Exception {
250             for (int k = 0; k < WAIT_COUNT; k++) {
251                 Thread.sleep(WAIT_SLEEP);
252                 if (isActive) {
253                     for (final Channel channel : group) {
254                         final ServerHandler handler = channel.pipeline().get(
255                                 ServerHandler.class);
256                         if (handler != null && handler.isActive) {
257                             return;
258                         }
259                     }
260                 } else {
261                     if (group.isEmpty()) {
262                         return;
263                     }
264                 }
265             }
266         }
267 
268         void waitForRunning(final boolean isRunning) throws Exception {
269             for (int k = 0; k < WAIT_COUNT; k++) {
270                 if (isRunning == this.isRunning) {
271                     return;
272                 }
273                 Thread.sleep(WAIT_SLEEP);
274             }
275         }
276 
277         void waitForShutdown() throws Exception {
278             for (int k = 0; k < WAIT_COUNT; k++) {
279                 if (isShutdown) {
280                     return;
281                 }
282                 Thread.sleep(WAIT_SLEEP);
283             }
284         }
285     }
286 
287     static class ServerHandler extends
288             SimpleChannelInboundHandler<Object> {
289 
290         static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
291 
292         final ChannelGroup group;
293 
294         volatile boolean isActive;
295 
296         ServerHandler(final ChannelGroup group) {
297             this.group = group;
298         }
299 
300         @Override
301         public void channelActive(final ChannelHandlerContext ctx)
302                 throws Exception {
303             group.add(ctx.channel());
304             isActive = true;
305             log.info("Server active  : {}", ctx.channel());
306             super.channelActive(ctx);
307         }
308 
309         @Override
310         public void channelInactive(final ChannelHandlerContext ctx)
311                 throws Exception {
312             group.remove(ctx.channel());
313             isActive = false;
314             log.info("Server inactive: {}", ctx.channel());
315             super.channelInactive(ctx);
316         }
317 
318         @Override
319         public void exceptionCaught(final ChannelHandlerContext ctx,
320                 final Throwable cause) {
321             log.warn("Server close on exception.", cause);
322             ctx.close();
323         }
324 
325         @Override
326         public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
327             log.info("Server received: " + msg);
328         }
329     }
330     static final Logger log = LoggerFactory
331             .getLogger(UDTClientServerConnectionTest.class);
332 
333     /**
334      * Maximum wait time is 5 seconds.
335      * <p>
336      * wait-time = {@code WAIT_COUNT} * {@value #WAIT_SLEEP}
337      */
338     static final int WAIT_COUNT = 50;
339     static final int WAIT_SLEEP = 100;
340 
341     @BeforeAll
342     public static void assumeUdt() {
343         assumeTrue(canLoadAndInit(), "com.barchart.udt.SocketUDT can not be loaded and initialized");
344         assumeFalse(PlatformDependent.isJ9Jvm(), "Not supported on J9 JVM");
345     }
346 
347     private static boolean canLoadAndInit() {
348         try {
349             Class.forName("com.barchart.udt.SocketUDT", true,
350                     UDTClientServerConnectionTest.class.getClassLoader());
351             return true;
352         } catch (Throwable e) {
353             return false;
354         }
355     }
356 
357     /**
358      * Verify UDT client/server connect and disconnect.
359      */
360     @Test
361     public void connection() throws Exception {
362         log.info("Starting server.");
363         // Using LOCALHOST4 as UDT transport does not support IPV6 :(
364         final Server server = new Server(new InetSocketAddress(NetUtil.LOCALHOST4, 0));
365         final Thread serverTread = new Thread(server, "server-*");
366         serverTread.start();
367         server.waitForRunning(true);
368         assertTrue(server.isRunning);
369 
370         log.info("Starting client.");
371         final Client client = new Client((InetSocketAddress) server.channel.localAddress());
372         final Thread clientThread = new Thread(client, "client-*");
373         clientThread.start();
374         client.waitForRunning(true);
375         assertTrue(client.isRunning);
376 
377         log.info("Wait till connection is active.");
378         client.waitForActive(true);
379         server.waitForActive(true);
380 
381         log.info("Verify connection is active.");
382         assertEquals(1, server.group.size(), "group must have one");
383 
384         log.info("Stopping client.");
385         client.shutdown();
386         client.waitForShutdown();
387         assertTrue(client.isShutdown);
388 
389         log.info("Wait till connection is inactive.");
390         client.waitForActive(false);
391         server.waitForActive(false);
392 
393         log.info("Verify connection is inactive.");
394         assertEquals(0, server.group.size(), "group must be empty");
395 
396         log.info("Stopping server.");
397         server.shutdown();
398         server.waitForShutdown();
399         assertTrue(server.isShutdown);
400 
401         log.info("Finished server.");
402     }
403 
404 }