1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.udt;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelInitializer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.EventLoopGroup;
25 import io.netty.channel.MultiThreadIoEventLoopGroup;
26 import io.netty.channel.SimpleChannelInboundHandler;
27 import io.netty.channel.group.ChannelGroup;
28 import io.netty.channel.group.DefaultChannelGroup;
29 import io.netty.channel.nio.NioIoHandler;
30 import io.netty.channel.udt.UdtChannel;
31 import io.netty.channel.udt.nio.NioUdtProvider;
32 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
33 import io.netty.handler.codec.Delimiters;
34 import io.netty.handler.codec.string.StringDecoder;
35 import io.netty.handler.codec.string.StringEncoder;
36 import io.netty.util.CharsetUtil;
37 import io.netty.util.NetUtil;
38 import io.netty.util.concurrent.DefaultThreadFactory;
39 import io.netty.util.concurrent.GlobalEventExecutor;
40 import io.netty.util.internal.PlatformDependent;
41 import org.junit.jupiter.api.BeforeAll;
42 import org.junit.jupiter.api.Test;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import java.net.InetSocketAddress;
47 import java.util.concurrent.ThreadFactory;
48
49 import static org.junit.jupiter.api.Assertions.assertEquals;
50 import static org.junit.jupiter.api.Assertions.assertTrue;
51 import static org.junit.jupiter.api.Assumptions.assumeFalse;
52 import static org.junit.jupiter.api.Assumptions.assumeTrue;
53
54
55
56
57 public class UDTClientServerConnectionTest {
58
59 static class Client implements Runnable {
60
61 static final Logger log = LoggerFactory.getLogger(Client.class);
62
63 private final InetSocketAddress address;
64
65 volatile Channel channel;
66 volatile boolean isRunning;
67 volatile boolean isShutdown;
68
69 Client(InetSocketAddress address) {
70 this.address = address;
71 }
72
73 @Override
74 public void run() {
75 final Bootstrap boot = new Bootstrap();
76 final ThreadFactory clientFactory = new DefaultThreadFactory("client");
77 final EventLoopGroup connectGroup = new MultiThreadIoEventLoopGroup(1,
78 clientFactory, NioIoHandler.newFactory(NioUdtProvider.BYTE_PROVIDER));
79 try {
80 boot.group(connectGroup)
81 .channelFactory(NioUdtProvider.BYTE_CONNECTOR)
82 .handler(new ChannelInitializer<UdtChannel>() {
83
84 @Override
85 protected void initChannel(final UdtChannel ch)
86 throws Exception {
87 final ChannelPipeline pipeline = ch.pipeline();
88 pipeline.addLast("framer",
89 new DelimiterBasedFrameDecoder(8192,
90 Delimiters.lineDelimiter()));
91 pipeline.addLast("decoder", new StringDecoder(
92 CharsetUtil.UTF_8));
93 pipeline.addLast("encoder", new StringEncoder(
94 CharsetUtil.UTF_8));
95 pipeline.addLast("handler", new ClientHandler());
96 }
97 });
98 channel = boot.connect(address).sync().channel();
99 isRunning = true;
100 log.info("Client ready.");
101 waitForRunning(false);
102 log.info("Client closing...");
103 channel.close().sync();
104 isShutdown = true;
105 log.info("Client is done.");
106 } catch (final Throwable e) {
107 log.error("Client failed.", e);
108 } finally {
109 connectGroup.shutdownGracefully().syncUninterruptibly();
110 }
111 }
112
113 void shutdown() {
114 isRunning = false;
115 }
116
117 void waitForActive(final boolean isActive) throws Exception {
118 for (int k = 0; k < WAIT_COUNT; k++) {
119 Thread.sleep(WAIT_SLEEP);
120 final ClientHandler handler = channel.pipeline().get(
121 ClientHandler.class);
122 if (handler != null && isActive == handler.isActive) {
123 return;
124 }
125 }
126 }
127
128 void waitForRunning(final boolean isRunning) throws Exception {
129 for (int k = 0; k < WAIT_COUNT; k++) {
130 if (isRunning == this.isRunning) {
131 return;
132 }
133 Thread.sleep(WAIT_SLEEP);
134 }
135 }
136
137 private void waitForShutdown() throws Exception {
138 for (int k = 0; k < WAIT_COUNT; k++) {
139 if (isShutdown) {
140 return;
141 }
142 Thread.sleep(WAIT_SLEEP);
143 }
144 }
145 }
146
147 static class ClientHandler extends SimpleChannelInboundHandler<Object> {
148
149 static final Logger log = LoggerFactory.getLogger(ClientHandler.class);
150
151 volatile boolean isActive;
152
153 @Override
154 public void channelActive(final ChannelHandlerContext ctx)
155 throws Exception {
156 isActive = true;
157 log.info("Client active {}", ctx.channel());
158 super.channelActive(ctx);
159 }
160
161 @Override
162 public void channelInactive(final ChannelHandlerContext ctx)
163 throws Exception {
164 isActive = false;
165 log.info("Client inactive {}", ctx.channel());
166 super.channelInactive(ctx);
167 }
168
169 @Override
170 public void exceptionCaught(final ChannelHandlerContext ctx,
171 final Throwable cause) throws Exception {
172 log.warn("Client unexpected exception from downstream.", cause);
173 ctx.close();
174 }
175
176 @Override
177 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
178 log.info("Client received: " + msg);
179 }
180 }
181
182 static class Server implements Runnable {
183
184 static final Logger log = LoggerFactory.getLogger(Server.class);
185
186 final ChannelGroup group = new DefaultChannelGroup("server group", GlobalEventExecutor.INSTANCE);
187
188 private final InetSocketAddress address;
189
190 volatile Channel channel;
191 volatile boolean isRunning;
192 volatile boolean isShutdown;
193
194 Server(InetSocketAddress address) {
195 this.address = address;
196 }
197
198 @Override
199 public void run() {
200 final ServerBootstrap boot = new ServerBootstrap();
201 final ThreadFactory factory = new DefaultThreadFactory("udp");
202 final EventLoopGroup eventLoopGroup = new MultiThreadIoEventLoopGroup(1,
203 factory, NioIoHandler.newFactory(NioUdtProvider.BYTE_PROVIDER));
204 try {
205 boot.group(eventLoopGroup)
206 .channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
207 .childHandler(new ChannelInitializer<UdtChannel>() {
208 @Override
209 protected void initChannel(final UdtChannel ch)
210 throws Exception {
211 final ChannelPipeline pipeline = ch.pipeline();
212 pipeline.addLast("framer",
213 new DelimiterBasedFrameDecoder(8192,
214 Delimiters.lineDelimiter()));
215 pipeline.addLast("decoder", new StringDecoder(
216 CharsetUtil.UTF_8));
217 pipeline.addLast("encoder", new StringEncoder(
218 CharsetUtil.UTF_8));
219 pipeline.addLast("handler", new ServerHandler(
220 group));
221 }
222 });
223 channel = boot.bind(address).sync().channel();
224 isRunning = true;
225 log.info("Server ready.");
226 waitForRunning(false);
227 log.info("Server closing acceptor...");
228 channel.close().sync();
229 log.info("Server closing connectors...");
230 group.close().sync();
231 isShutdown = true;
232 log.info("Server is done.");
233 } catch (final Throwable e) {
234 log.error("Server failure.", e);
235 } finally {
236 eventLoopGroup.shutdownGracefully();
237 eventLoopGroup.terminationFuture().syncUninterruptibly();
238 }
239 }
240
241 void shutdown() {
242 isRunning = false;
243 }
244
245 void waitForActive(final boolean isActive) throws Exception {
246 for (int k = 0; k < WAIT_COUNT; k++) {
247 Thread.sleep(WAIT_SLEEP);
248 if (isActive) {
249 for (final Channel channel : group) {
250 final ServerHandler handler = channel.pipeline().get(
251 ServerHandler.class);
252 if (handler != null && handler.isActive) {
253 return;
254 }
255 }
256 } else {
257 if (group.isEmpty()) {
258 return;
259 }
260 }
261 }
262 }
263
264 void waitForRunning(final boolean isRunning) throws Exception {
265 for (int k = 0; k < WAIT_COUNT; k++) {
266 if (isRunning == this.isRunning) {
267 return;
268 }
269 Thread.sleep(WAIT_SLEEP);
270 }
271 }
272
273 void waitForShutdown() throws Exception {
274 for (int k = 0; k < WAIT_COUNT; k++) {
275 if (isShutdown) {
276 return;
277 }
278 Thread.sleep(WAIT_SLEEP);
279 }
280 }
281 }
282
283 static class ServerHandler extends
284 SimpleChannelInboundHandler<Object> {
285
286 static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
287
288 final ChannelGroup group;
289
290 volatile boolean isActive;
291
292 ServerHandler(final ChannelGroup group) {
293 this.group = group;
294 }
295
296 @Override
297 public void channelActive(final ChannelHandlerContext ctx)
298 throws Exception {
299 group.add(ctx.channel());
300 isActive = true;
301 log.info("Server active : {}", ctx.channel());
302 super.channelActive(ctx);
303 }
304
305 @Override
306 public void channelInactive(final ChannelHandlerContext ctx)
307 throws Exception {
308 group.remove(ctx.channel());
309 isActive = false;
310 log.info("Server inactive: {}", ctx.channel());
311 super.channelInactive(ctx);
312 }
313
314 @Override
315 public void exceptionCaught(final ChannelHandlerContext ctx,
316 final Throwable cause) {
317 log.warn("Server close on exception.", cause);
318 ctx.close();
319 }
320
321 @Override
322 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
323 log.info("Server received: " + msg);
324 }
325 }
326 static final Logger log = LoggerFactory
327 .getLogger(UDTClientServerConnectionTest.class);
328
329
330
331
332
333
334 static final int WAIT_COUNT = 50;
335 static final int WAIT_SLEEP = 100;
336
337 @BeforeAll
338 public static void assumeUdt() {
339 assumeTrue(canLoadAndInit(), "com.barchart.udt.SocketUDT can not be loaded and initialized");
340 assumeFalse(PlatformDependent.isJ9Jvm(), "Not supported on J9 JVM");
341 }
342
343 private static boolean canLoadAndInit() {
344 try {
345 Class.forName("com.barchart.udt.SocketUDT", true,
346 UDTClientServerConnectionTest.class.getClassLoader());
347 return true;
348 } catch (Throwable e) {
349 return false;
350 }
351 }
352
353
354
355
356 @Test
357 public void connection() throws Exception {
358 log.info("Starting server.");
359
360 final Server server = new Server(new InetSocketAddress(NetUtil.LOCALHOST4, 0));
361 final Thread serverTread = new Thread(server, "server-*");
362 serverTread.start();
363 server.waitForRunning(true);
364 assertTrue(server.isRunning);
365
366 log.info("Starting client.");
367 final Client client = new Client((InetSocketAddress) server.channel.localAddress());
368 final Thread clientThread = new Thread(client, "client-*");
369 clientThread.start();
370 client.waitForRunning(true);
371 assertTrue(client.isRunning);
372
373 log.info("Wait till connection is active.");
374 client.waitForActive(true);
375 server.waitForActive(true);
376
377 log.info("Verify connection is active.");
378 assertEquals(1, server.group.size(), "group must have one");
379
380 log.info("Stopping client.");
381 client.shutdown();
382 client.waitForShutdown();
383 assertTrue(client.isShutdown);
384
385 log.info("Wait till connection is inactive.");
386 client.waitForActive(false);
387 server.waitForActive(false);
388
389 log.info("Verify connection is inactive.");
390 assertEquals(0, server.group.size(), "group must be empty");
391
392 log.info("Stopping server.");
393 server.shutdown();
394 server.waitForShutdown();
395 assertTrue(server.isShutdown);
396
397 log.info("Finished server.");
398 }
399
400 }