1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.udt;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelInitializer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.SimpleChannelInboundHandler;
25 import io.netty.channel.group.ChannelGroup;
26 import io.netty.channel.group.DefaultChannelGroup;
27 import io.netty.channel.nio.NioEventLoopGroup;
28 import io.netty.channel.udt.UdtChannel;
29 import io.netty.channel.udt.nio.NioUdtProvider;
30 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
31 import io.netty.handler.codec.Delimiters;
32 import io.netty.handler.codec.string.StringDecoder;
33 import io.netty.handler.codec.string.StringEncoder;
34 import io.netty.util.CharsetUtil;
35 import io.netty.util.NetUtil;
36 import io.netty.util.concurrent.DefaultThreadFactory;
37 import io.netty.util.concurrent.GlobalEventExecutor;
38 import io.netty.util.internal.PlatformDependent;
39 import org.junit.jupiter.api.BeforeAll;
40 import org.junit.jupiter.api.Test;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import java.net.InetSocketAddress;
45 import java.util.concurrent.ThreadFactory;
46
47 import static org.junit.jupiter.api.Assertions.assertEquals;
48 import static org.junit.jupiter.api.Assertions.assertTrue;
49 import static org.junit.jupiter.api.Assumptions.assumeFalse;
50 import static org.junit.jupiter.api.Assumptions.assumeTrue;
51
52
53
54
55 public class UDTClientServerConnectionTest {
56
57 static class Client implements Runnable {
58
59 static final Logger log = LoggerFactory.getLogger(Client.class);
60
61 private final InetSocketAddress address;
62
63 volatile Channel channel;
64 volatile boolean isRunning;
65 volatile boolean isShutdown;
66
67 Client(InetSocketAddress address) {
68 this.address = address;
69 }
70
71 @Override
72 public void run() {
73 final Bootstrap boot = new Bootstrap();
74 final ThreadFactory clientFactory = new DefaultThreadFactory("client");
75 final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
76 clientFactory, NioUdtProvider.BYTE_PROVIDER);
77 try {
78 boot.group(connectGroup)
79 .channelFactory(NioUdtProvider.BYTE_CONNECTOR)
80 .handler(new ChannelInitializer<UdtChannel>() {
81
82 @Override
83 protected void initChannel(final UdtChannel ch)
84 throws Exception {
85 final ChannelPipeline pipeline = ch.pipeline();
86 pipeline.addLast("framer",
87 new DelimiterBasedFrameDecoder(8192,
88 Delimiters.lineDelimiter()));
89 pipeline.addLast("decoder", new StringDecoder(
90 CharsetUtil.UTF_8));
91 pipeline.addLast("encoder", new StringEncoder(
92 CharsetUtil.UTF_8));
93 pipeline.addLast("handler", new ClientHandler());
94 }
95 });
96 channel = boot.connect(address).sync().channel();
97 isRunning = true;
98 log.info("Client ready.");
99 waitForRunning(false);
100 log.info("Client closing...");
101 channel.close().sync();
102 isShutdown = true;
103 log.info("Client is done.");
104 } catch (final Throwable e) {
105 log.error("Client failed.", e);
106 } finally {
107 connectGroup.shutdownGracefully().syncUninterruptibly();
108 }
109 }
110
111 void shutdown() {
112 isRunning = false;
113 }
114
115 void waitForActive(final boolean isActive) throws Exception {
116 for (int k = 0; k < WAIT_COUNT; k++) {
117 Thread.sleep(WAIT_SLEEP);
118 final ClientHandler handler = channel.pipeline().get(
119 ClientHandler.class);
120 if (handler != null && isActive == handler.isActive) {
121 return;
122 }
123 }
124 }
125
126 void waitForRunning(final boolean isRunning) throws Exception {
127 for (int k = 0; k < WAIT_COUNT; k++) {
128 if (isRunning == this.isRunning) {
129 return;
130 }
131 Thread.sleep(WAIT_SLEEP);
132 }
133 }
134
135 private void waitForShutdown() throws Exception {
136 for (int k = 0; k < WAIT_COUNT; k++) {
137 if (isShutdown) {
138 return;
139 }
140 Thread.sleep(WAIT_SLEEP);
141 }
142 }
143 }
144
145 static class ClientHandler extends SimpleChannelInboundHandler<Object> {
146
147 static final Logger log = LoggerFactory.getLogger(ClientHandler.class);
148
149 volatile boolean isActive;
150
151 @Override
152 public void channelActive(final ChannelHandlerContext ctx)
153 throws Exception {
154 isActive = true;
155 log.info("Client active {}", ctx.channel());
156 super.channelActive(ctx);
157 }
158
159 @Override
160 public void channelInactive(final ChannelHandlerContext ctx)
161 throws Exception {
162 isActive = false;
163 log.info("Client inactive {}", ctx.channel());
164 super.channelInactive(ctx);
165 }
166
167 @Override
168 public void exceptionCaught(final ChannelHandlerContext ctx,
169 final Throwable cause) throws Exception {
170 log.warn("Client unexpected exception from downstream.", cause);
171 ctx.close();
172 }
173
174 @Override
175 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
176 log.info("Client received: " + msg);
177 }
178 }
179
180 static class Server implements Runnable {
181
182 static final Logger log = LoggerFactory.getLogger(Server.class);
183
184 final ChannelGroup group = new DefaultChannelGroup("server group", GlobalEventExecutor.INSTANCE);
185
186 private final InetSocketAddress address;
187
188 volatile Channel channel;
189 volatile boolean isRunning;
190 volatile boolean isShutdown;
191
192 Server(InetSocketAddress address) {
193 this.address = address;
194 }
195
196 @Override
197 public void run() {
198 final ServerBootstrap boot = new ServerBootstrap();
199 final ThreadFactory factory = new DefaultThreadFactory("udp");
200 final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1,
201 factory, NioUdtProvider.BYTE_PROVIDER);
202 try {
203 boot.group(eventLoopGroup)
204 .channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
205 .childHandler(new ChannelInitializer<UdtChannel>() {
206 @Override
207 protected void initChannel(final UdtChannel ch)
208 throws Exception {
209 final ChannelPipeline pipeline = ch.pipeline();
210 pipeline.addLast("framer",
211 new DelimiterBasedFrameDecoder(8192,
212 Delimiters.lineDelimiter()));
213 pipeline.addLast("decoder", new StringDecoder(
214 CharsetUtil.UTF_8));
215 pipeline.addLast("encoder", new StringEncoder(
216 CharsetUtil.UTF_8));
217 pipeline.addLast("handler", new ServerHandler(
218 group));
219 }
220 });
221 channel = boot.bind(address).sync().channel();
222 isRunning = true;
223 log.info("Server ready.");
224 waitForRunning(false);
225 log.info("Server closing acceptor...");
226 channel.close().sync();
227 log.info("Server closing connectors...");
228 group.close().sync();
229 isShutdown = true;
230 log.info("Server is done.");
231 } catch (final Throwable e) {
232 log.error("Server failure.", e);
233 } finally {
234 eventLoopGroup.shutdownGracefully();
235 eventLoopGroup.terminationFuture().syncUninterruptibly();
236 }
237 }
238
239 void shutdown() {
240 isRunning = false;
241 }
242
243 void waitForActive(final boolean isActive) throws Exception {
244 for (int k = 0; k < WAIT_COUNT; k++) {
245 Thread.sleep(WAIT_SLEEP);
246 if (isActive) {
247 for (final Channel channel : group) {
248 final ServerHandler handler = channel.pipeline().get(
249 ServerHandler.class);
250 if (handler != null && handler.isActive) {
251 return;
252 }
253 }
254 } else {
255 if (group.isEmpty()) {
256 return;
257 }
258 }
259 }
260 }
261
262 void waitForRunning(final boolean isRunning) throws Exception {
263 for (int k = 0; k < WAIT_COUNT; k++) {
264 if (isRunning == this.isRunning) {
265 return;
266 }
267 Thread.sleep(WAIT_SLEEP);
268 }
269 }
270
271 void waitForShutdown() throws Exception {
272 for (int k = 0; k < WAIT_COUNT; k++) {
273 if (isShutdown) {
274 return;
275 }
276 Thread.sleep(WAIT_SLEEP);
277 }
278 }
279 }
280
281 static class ServerHandler extends
282 SimpleChannelInboundHandler<Object> {
283
284 static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
285
286 final ChannelGroup group;
287
288 volatile boolean isActive;
289
290 ServerHandler(final ChannelGroup group) {
291 this.group = group;
292 }
293
294 @Override
295 public void channelActive(final ChannelHandlerContext ctx)
296 throws Exception {
297 group.add(ctx.channel());
298 isActive = true;
299 log.info("Server active : {}", ctx.channel());
300 super.channelActive(ctx);
301 }
302
303 @Override
304 public void channelInactive(final ChannelHandlerContext ctx)
305 throws Exception {
306 group.remove(ctx.channel());
307 isActive = false;
308 log.info("Server inactive: {}", ctx.channel());
309 super.channelInactive(ctx);
310 }
311
312 @Override
313 public void exceptionCaught(final ChannelHandlerContext ctx,
314 final Throwable cause) {
315 log.warn("Server close on exception.", cause);
316 ctx.close();
317 }
318
319 @Override
320 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
321 log.info("Server received: " + msg);
322 }
323 }
324 static final Logger log = LoggerFactory
325 .getLogger(UDTClientServerConnectionTest.class);
326
327
328
329
330
331
332 static final int WAIT_COUNT = 50;
333 static final int WAIT_SLEEP = 100;
334
335 @BeforeAll
336 public static void assumeUdt() {
337 assumeTrue(canLoadAndInit(), "com.barchart.udt.SocketUDT can not be loaded and initialized");
338 assumeFalse(PlatformDependent.isJ9Jvm(), "Not supported on J9 JVM");
339 }
340
341 private static boolean canLoadAndInit() {
342 try {
343 Class.forName("com.barchart.udt.SocketUDT", true,
344 UDTClientServerConnectionTest.class.getClassLoader());
345 return true;
346 } catch (Throwable e) {
347 return false;
348 }
349 }
350
351
352
353
354 @Test
355 public void connection() throws Exception {
356 log.info("Starting server.");
357
358 final Server server = new Server(new InetSocketAddress(NetUtil.LOCALHOST4, 0));
359 final Thread serverTread = new Thread(server, "server-*");
360 serverTread.start();
361 server.waitForRunning(true);
362 assertTrue(server.isRunning);
363
364 log.info("Starting client.");
365 final Client client = new Client((InetSocketAddress) server.channel.localAddress());
366 final Thread clientThread = new Thread(client, "client-*");
367 clientThread.start();
368 client.waitForRunning(true);
369 assertTrue(client.isRunning);
370
371 log.info("Wait till connection is active.");
372 client.waitForActive(true);
373 server.waitForActive(true);
374
375 log.info("Verify connection is active.");
376 assertEquals(1, server.group.size(), "group must have one");
377
378 log.info("Stopping client.");
379 client.shutdown();
380 client.waitForShutdown();
381 assertTrue(client.isShutdown);
382
383 log.info("Wait till connection is inactive.");
384 client.waitForActive(false);
385 server.waitForActive(false);
386
387 log.info("Verify connection is inactive.");
388 assertEquals(0, server.group.size(), "group must be empty");
389
390 log.info("Stopping server.");
391 server.shutdown();
392 server.waitForShutdown();
393 assertTrue(server.isShutdown);
394
395 log.info("Finished server.");
396 }
397
398 }