1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.udt;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelInitializer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.EventLoopGroup;
25 import io.netty.channel.MultiThreadIoEventLoopGroup;
26 import io.netty.channel.SimpleChannelInboundHandler;
27 import io.netty.channel.group.ChannelGroup;
28 import io.netty.channel.group.DefaultChannelGroup;
29 import io.netty.channel.nio.NioIoHandler;
30 import io.netty.channel.udt.UdtChannel;
31 import io.netty.channel.udt.nio.NioUdtProvider;
32 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
33 import io.netty.handler.codec.Delimiters;
34 import io.netty.handler.codec.string.StringDecoder;
35 import io.netty.handler.codec.string.StringEncoder;
36 import io.netty.util.CharsetUtil;
37 import io.netty.util.NetUtil;
38 import io.netty.util.concurrent.DefaultThreadFactory;
39 import io.netty.util.concurrent.GlobalEventExecutor;
40 import io.netty.util.internal.PlatformDependent;
41 import org.junit.jupiter.api.BeforeAll;
42 import org.junit.jupiter.api.Test;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import java.net.InetSocketAddress;
47 import java.util.concurrent.ThreadFactory;
48
49 import static org.junit.jupiter.api.Assertions.assertEquals;
50 import static org.junit.jupiter.api.Assertions.assertTrue;
51 import static org.junit.jupiter.api.Assumptions.assumeFalse;
52 import static org.junit.jupiter.api.Assumptions.assumeTrue;
53
54
55
56
57 public class UDTClientServerConnectionTest {
58
59 static class Client implements Runnable {
60
61 static final Logger log = LoggerFactory.getLogger(Client.class);
62
63 private final InetSocketAddress address;
64
65 volatile Channel channel;
66 volatile boolean isRunning;
67 volatile boolean isShutdown;
68
69 Client(InetSocketAddress address) {
70 this.address = address;
71 }
72
73 @Override
74 public void run() {
75 final Bootstrap boot = new Bootstrap();
76 final ThreadFactory clientFactory = new DefaultThreadFactory("client");
77 final EventLoopGroup connectGroup = new MultiThreadIoEventLoopGroup(1,
78 clientFactory, NioIoHandler.newFactory(NioUdtProvider.BYTE_PROVIDER));
79 try {
80 boot.group(connectGroup)
81 .channelFactory(NioUdtProvider.BYTE_CONNECTOR)
82 .handler(new ChannelInitializer<UdtChannel>() {
83
84 @Override
85 protected void initChannel(final UdtChannel ch)
86 throws Exception {
87 final ChannelPipeline pipeline = ch.pipeline();
88 pipeline.addLast("framer",
89 new DelimiterBasedFrameDecoder(8192,
90 Delimiters.lineDelimiter()));
91 pipeline.addLast("decoder", new StringDecoder(
92 CharsetUtil.UTF_8));
93 pipeline.addLast("encoder", new StringEncoder(
94 CharsetUtil.UTF_8));
95 pipeline.addLast("handler", new ClientHandler());
96 }
97 });
98 channel = boot.connect(address).sync().channel();
99 isRunning = true;
100 log.info("Client ready.");
101 waitForRunning(false);
102 log.info("Client closing...");
103 channel.close().sync();
104 isShutdown = true;
105 log.info("Client is done.");
106 } catch (final Throwable e) {
107 log.error("Client failed.", e);
108 } finally {
109 connectGroup.shutdownGracefully().syncUninterruptibly();
110 }
111 }
112
113 void shutdown() {
114 isRunning = false;
115 }
116
117 void waitForActive(final boolean isActive) throws Exception {
118 for (int k = 0; k < WAIT_COUNT; k++) {
119 Thread.sleep(WAIT_SLEEP);
120 final ClientHandler handler = channel.pipeline().get(
121 ClientHandler.class);
122 if (handler != null && isActive == handler.isActive) {
123 return;
124 }
125 }
126 }
127
128 void waitForRunning(final boolean isRunning) throws Exception {
129 for (int k = 0; k < WAIT_COUNT; k++) {
130 if (isRunning == this.isRunning) {
131 return;
132 }
133 Thread.sleep(WAIT_SLEEP);
134 }
135 }
136
137 private void waitForShutdown() throws Exception {
138 for (int k = 0; k < WAIT_COUNT; k++) {
139 if (isShutdown) {
140 return;
141 }
142 Thread.sleep(WAIT_SLEEP);
143 }
144 }
145 }
146
147 static class ClientHandler extends SimpleChannelInboundHandler<Object> {
148
149 static final Logger log = LoggerFactory.getLogger(ClientHandler.class);
150
151 volatile boolean isActive;
152
153 @Override
154 public void channelActive(final ChannelHandlerContext ctx)
155 throws Exception {
156 isActive = true;
157 log.info("Client active {}", ctx.channel());
158 super.channelActive(ctx);
159 }
160
161 @Override
162 public void channelInactive(final ChannelHandlerContext ctx)
163 throws Exception {
164 isActive = false;
165 log.info("Client inactive {}", ctx.channel());
166 super.channelInactive(ctx);
167 }
168
169 @Override
170 public void exceptionCaught(final ChannelHandlerContext ctx,
171 final Throwable cause) throws Exception {
172 log.warn("Client unexpected exception from downstream.", cause);
173 ctx.close();
174 }
175
176 @Override
177 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
178 log.info("Client received: " + msg);
179 }
180 }
181
182 static class Server implements Runnable {
183
184 static final Logger log = LoggerFactory.getLogger(Server.class);
185
186 final ChannelGroup group = new DefaultChannelGroup("server group", GlobalEventExecutor.INSTANCE);
187
188 private final InetSocketAddress address;
189
190 volatile Channel channel;
191 volatile boolean isRunning;
192 volatile boolean isShutdown;
193
194 Server(InetSocketAddress address) {
195 this.address = address;
196 }
197
198 @Override
199 public void run() {
200 final ServerBootstrap boot = new ServerBootstrap();
201 final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
202 final ThreadFactory serverFactory = new DefaultThreadFactory("server");
203 final EventLoopGroup acceptGroup = new MultiThreadIoEventLoopGroup(1,
204 acceptFactory, NioIoHandler.newFactory(NioUdtProvider.BYTE_PROVIDER));
205 final EventLoopGroup connectGroup = new MultiThreadIoEventLoopGroup(1,
206 serverFactory, NioIoHandler.newFactory(NioUdtProvider.BYTE_PROVIDER));
207 try {
208 boot.group(acceptGroup, connectGroup)
209 .channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
210 .childHandler(new ChannelInitializer<UdtChannel>() {
211 @Override
212 protected void initChannel(final UdtChannel ch)
213 throws Exception {
214 final ChannelPipeline pipeline = ch.pipeline();
215 pipeline.addLast("framer",
216 new DelimiterBasedFrameDecoder(8192,
217 Delimiters.lineDelimiter()));
218 pipeline.addLast("decoder", new StringDecoder(
219 CharsetUtil.UTF_8));
220 pipeline.addLast("encoder", new StringEncoder(
221 CharsetUtil.UTF_8));
222 pipeline.addLast("handler", new ServerHandler(
223 group));
224 }
225 });
226 channel = boot.bind(address).sync().channel();
227 isRunning = true;
228 log.info("Server ready.");
229 waitForRunning(false);
230 log.info("Server closing acceptor...");
231 channel.close().sync();
232 log.info("Server closing connectors...");
233 group.close().sync();
234 isShutdown = true;
235 log.info("Server is done.");
236 } catch (final Throwable e) {
237 log.error("Server failure.", e);
238 } finally {
239 acceptGroup.shutdownGracefully();
240 connectGroup.shutdownGracefully();
241
242 acceptGroup.terminationFuture().syncUninterruptibly();
243 connectGroup.terminationFuture().syncUninterruptibly();
244 }
245 }
246
247 void shutdown() {
248 isRunning = false;
249 }
250
251 void waitForActive(final boolean isActive) throws Exception {
252 for (int k = 0; k < WAIT_COUNT; k++) {
253 Thread.sleep(WAIT_SLEEP);
254 if (isActive) {
255 for (final Channel channel : group) {
256 final ServerHandler handler = channel.pipeline().get(
257 ServerHandler.class);
258 if (handler != null && handler.isActive) {
259 return;
260 }
261 }
262 } else {
263 if (group.isEmpty()) {
264 return;
265 }
266 }
267 }
268 }
269
270 void waitForRunning(final boolean isRunning) throws Exception {
271 for (int k = 0; k < WAIT_COUNT; k++) {
272 if (isRunning == this.isRunning) {
273 return;
274 }
275 Thread.sleep(WAIT_SLEEP);
276 }
277 }
278
279 void waitForShutdown() throws Exception {
280 for (int k = 0; k < WAIT_COUNT; k++) {
281 if (isShutdown) {
282 return;
283 }
284 Thread.sleep(WAIT_SLEEP);
285 }
286 }
287 }
288
289 static class ServerHandler extends
290 SimpleChannelInboundHandler<Object> {
291
292 static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
293
294 final ChannelGroup group;
295
296 volatile boolean isActive;
297
298 ServerHandler(final ChannelGroup group) {
299 this.group = group;
300 }
301
302 @Override
303 public void channelActive(final ChannelHandlerContext ctx)
304 throws Exception {
305 group.add(ctx.channel());
306 isActive = true;
307 log.info("Server active : {}", ctx.channel());
308 super.channelActive(ctx);
309 }
310
311 @Override
312 public void channelInactive(final ChannelHandlerContext ctx)
313 throws Exception {
314 group.remove(ctx.channel());
315 isActive = false;
316 log.info("Server inactive: {}", ctx.channel());
317 super.channelInactive(ctx);
318 }
319
320 @Override
321 public void exceptionCaught(final ChannelHandlerContext ctx,
322 final Throwable cause) {
323 log.warn("Server close on exception.", cause);
324 ctx.close();
325 }
326
327 @Override
328 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
329 log.info("Server received: " + msg);
330 }
331 }
332 static final Logger log = LoggerFactory
333 .getLogger(UDTClientServerConnectionTest.class);
334
335
336
337
338
339
340 static final int WAIT_COUNT = 50;
341 static final int WAIT_SLEEP = 100;
342
343 @BeforeAll
344 public static void assumeUdt() {
345 assumeTrue(canLoadAndInit(), "com.barchart.udt.SocketUDT can not be loaded and initialized");
346 assumeFalse(PlatformDependent.isJ9Jvm(), "Not supported on J9 JVM");
347 }
348
349 private static boolean canLoadAndInit() {
350 try {
351 Class.forName("com.barchart.udt.SocketUDT", true,
352 UDTClientServerConnectionTest.class.getClassLoader());
353 return true;
354 } catch (Throwable e) {
355 return false;
356 }
357 }
358
359
360
361
362 @Test
363 public void connection() throws Exception {
364 log.info("Starting server.");
365
366 final Server server = new Server(new InetSocketAddress(NetUtil.LOCALHOST4, 0));
367 final Thread serverTread = new Thread(server, "server-*");
368 serverTread.start();
369 server.waitForRunning(true);
370 assertTrue(server.isRunning);
371
372 log.info("Starting client.");
373 final Client client = new Client((InetSocketAddress) server.channel.localAddress());
374 final Thread clientThread = new Thread(client, "client-*");
375 clientThread.start();
376 client.waitForRunning(true);
377 assertTrue(client.isRunning);
378
379 log.info("Wait till connection is active.");
380 client.waitForActive(true);
381 server.waitForActive(true);
382
383 log.info("Verify connection is active.");
384 assertEquals(1, server.group.size(), "group must have one");
385
386 log.info("Stopping client.");
387 client.shutdown();
388 client.waitForShutdown();
389 assertTrue(client.isShutdown);
390
391 log.info("Wait till connection is inactive.");
392 client.waitForActive(false);
393 server.waitForActive(false);
394
395 log.info("Verify connection is inactive.");
396 assertEquals(0, server.group.size(), "group must be empty");
397
398 log.info("Stopping server.");
399 server.shutdown();
400 server.waitForShutdown();
401 assertTrue(server.isShutdown);
402
403 log.info("Finished server.");
404 }
405
406 }