1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.CompositeByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelFuture;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelInboundHandlerAdapter;
26 import io.netty.channel.ChannelInitializer;
27 import io.netty.channel.ChannelOption;
28 import io.netty.channel.SimpleChannelInboundHandler;
29 import io.netty.channel.socket.DatagramChannel;
30 import io.netty.channel.socket.DatagramPacket;
31 import io.netty.util.CharsetUtil;
32 import io.netty.util.NetUtil;
33 import io.netty.util.internal.EmptyArrays;
34 import org.junit.jupiter.api.Test;
35 import org.junit.jupiter.api.TestInfo;
36 import org.opentest4j.TestAbortedException;
37
38 import java.net.BindException;
39 import java.net.DatagramSocket;
40 import java.net.Inet6Address;
41 import java.net.InetAddress;
42 import java.net.InetSocketAddress;
43 import java.net.SocketAddress;
44 import java.net.SocketException;
45 import java.nio.channels.NotYetConnectedException;
46 import java.nio.channels.UnresolvedAddressException;
47 import java.util.ArrayList;
48 import java.util.List;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.Semaphore;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.atomic.AtomicReference;
53
54 import static org.assertj.core.api.Assumptions.assumeThat;
55 import static org.junit.jupiter.api.Assertions.assertFalse;
56 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
57 import static org.junit.jupiter.api.Assertions.assertNotNull;
58 import static org.junit.jupiter.api.Assertions.assertNull;
59 import static org.junit.jupiter.api.Assertions.assertTrue;
60 import static org.junit.jupiter.api.Assertions.fail;
61
62 public abstract class DatagramUnicastTest extends AbstractDatagramTest {
63
64 private static final byte[] BYTES = {0, 1, 2, 3};
65 protected enum WrapType {
66 NONE, DUP, SLICE, READ_ONLY
67 }
68
69 @Test
70 public void testSimpleSendDirectByteBuf(TestInfo testInfo) throws Throwable {
71 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
72 @Override
73 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
74 testSimpleSendDirectByteBuf(bootstrap, bootstrap2);
75 }
76 });
77 }
78
79 public void testSimpleSendDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
80 testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 1);
81 testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 4);
82 }
83
84 @Test
85 public void testSimpleSendHeapByteBuf(TestInfo testInfo) throws Throwable {
86 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
87 @Override
88 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
89 testSimpleSendHeapByteBuf(bootstrap, bootstrap2);
90 }
91 });
92 }
93
94 public void testSimpleSendHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
95 testSimpleSend(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 1);
96 testSimpleSend(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 4);
97 }
98
99 @Test
100 public void testSimpleSendCompositeDirectByteBuf(TestInfo testInfo) throws Throwable {
101 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
102 @Override
103 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
104 testSimpleSendCompositeDirectByteBuf(bootstrap, bootstrap2);
105 }
106 });
107 }
108
109 public void testSimpleSendCompositeDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
110 CompositeByteBuf buf = Unpooled.compositeBuffer();
111 buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
112 buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
113 testSimpleSend(sb, cb, buf, true, BYTES, 1);
114
115 CompositeByteBuf buf2 = Unpooled.compositeBuffer();
116 buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
117 buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
118 testSimpleSend(sb, cb, buf2, true, BYTES, 4);
119 }
120
121 @Test
122 public void testSimpleSendCompositeHeapByteBuf(TestInfo testInfo) throws Throwable {
123 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
124 @Override
125 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
126 testSimpleSendCompositeHeapByteBuf(bootstrap, bootstrap2);
127 }
128 });
129 }
130
131 public void testSimpleSendCompositeHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
132 CompositeByteBuf buf = Unpooled.compositeBuffer();
133 buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
134 buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
135 testSimpleSend(sb, cb, buf, true, BYTES, 1);
136
137 CompositeByteBuf buf2 = Unpooled.compositeBuffer();
138 buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
139 buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
140 testSimpleSend(sb, cb, buf2, true, BYTES, 4);
141 }
142
143 @Test
144 public void testSimpleSendCompositeMixedByteBuf(TestInfo testInfo) throws Throwable {
145 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
146 @Override
147 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
148 testSimpleSendCompositeMixedByteBuf(bootstrap, bootstrap2);
149 }
150 });
151 }
152
153 public void testSimpleSendCompositeMixedByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
154 CompositeByteBuf buf = Unpooled.compositeBuffer();
155 buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
156 buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
157 testSimpleSend(sb, cb, buf, true, BYTES, 1);
158
159 CompositeByteBuf buf2 = Unpooled.compositeBuffer();
160 buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
161 buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
162 testSimpleSend(sb, cb, buf2, true, BYTES, 4);
163 }
164
165 @Test
166 public void testSimpleSendWithoutBind(TestInfo testInfo) throws Throwable {
167 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
168 @Override
169 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
170 testSimpleSendWithoutBind(bootstrap, bootstrap2);
171 }
172 });
173 }
174
175 public void testSimpleSendWithoutBind(Bootstrap sb, Bootstrap cb) throws Throwable {
176 testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 1);
177 testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 4);
178 }
179
180 private void testSimpleSend(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
181 final byte[] bytes, int count) throws Throwable {
182 for (WrapType type: WrapType.values()) {
183 testSimpleSend0(sb, cb, buf.retain(), bindClient, bytes, count, type);
184 }
185 assertTrue(buf.release());
186 }
187
188 @Test
189 public void testSimpleSendWithConnect(TestInfo testInfo) throws Throwable {
190 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
191 @Override
192 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
193 testSimpleSendWithConnect(bootstrap, bootstrap2);
194 }
195 });
196 }
197
198 public void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb) throws Throwable {
199 testSimpleSendWithConnect(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), BYTES, 1);
200 testSimpleSendWithConnect(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), BYTES, 4);
201 }
202
203 @Test
204 public void testReceiveEmptyDatagrams(TestInfo testInfo) throws Throwable {
205 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
206 @Override
207 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
208 testReceiveEmptyDatagrams(bootstrap, bootstrap2);
209 }
210 });
211 }
212
213 public void testReceiveEmptyDatagrams(Bootstrap sb, Bootstrap cb) throws Throwable {
214 final Semaphore semaphore = new Semaphore(0);
215 Channel server = sb.handler(new ChannelInitializer<Channel>() {
216 @Override
217 protected void initChannel(Channel ch) throws Exception {
218 ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
219 @Override
220 protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
221 semaphore.release();
222 }
223 });
224 }
225 }).bind(newSocketAddress()).sync().channel();
226
227 SocketAddress address = server.localAddress();
228 DatagramSocket client;
229 try {
230 client = new DatagramSocket(newSocketAddress());
231 } catch (IllegalArgumentException e) {
232 assumeThat(e.getMessage()).doesNotContainIgnoringCase("unsupported address type");
233 throw e;
234 }
235 SocketAddress sendAddress = address instanceof InetSocketAddress ?
236 sendToAddress((InetSocketAddress) address) : address;
237 for (int i = 0; i < 100; i++) {
238 try {
239 client.send(new java.net.DatagramPacket(EmptyArrays.EMPTY_BYTES, 0, sendAddress));
240 } catch (BindException e) {
241 throw new TestAbortedException("JDK sockets do not support binding to these addresses.", e);
242 }
243 semaphore.acquire();
244 }
245 }
246
247 @Test
248 public void testSendToUnresolvableAddress(TestInfo testInfo) throws Throwable {
249 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
250 @Override
251 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
252 testSendToUnresolvableAddress(bootstrap, bootstrap2);
253 }
254 });
255 }
256
257 public void testSendToUnresolvableAddress(Bootstrap sb, Bootstrap cb) throws Throwable {
258 SocketAddress serverAddress = newSocketAddress();
259 if (!(serverAddress instanceof InetSocketAddress)) {
260 return;
261 }
262 Channel sc = sb.handler(new ChannelInitializer<Channel>() {
263 @Override
264 protected void initChannel(Channel ch) throws Exception {
265 ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
266 @Override
267 protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
268
269 }
270 });
271 }
272 }).bind(serverAddress).sync().channel();
273
274 Channel cc = cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true).
275 handler(new ChannelInboundHandlerAdapter()).register().sync().channel();
276 try {
277 InetSocketAddress goodHost = sendToAddress((InetSocketAddress) sc.localAddress());
278 InetSocketAddress unresolvedHost = new InetSocketAddress("NOT_A_REAL_ADDRESS", goodHost.getPort());
279
280 assertFalse(goodHost.isUnresolved());
281 assertTrue(unresolvedHost.isUnresolved());
282
283 String message = "hello world!";
284 cc.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), goodHost)).sync();
285 assertInstanceOf(UnresolvedAddressException.class, cc.writeAndFlush(new DatagramPacket(
286 Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), unresolvedHost)).await().cause());
287
288
289 assertTrue(cc.isOpen());
290
291
292 cc.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), goodHost)).sync();
293 assertInstanceOf(UnresolvedAddressException.class, cc.writeAndFlush(new DatagramPacket(
294 Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), unresolvedHost)).await().cause());
295 assertTrue(cc.isOpen());
296 } finally {
297 closeChannel(cc);
298 closeChannel(sc);
299 }
300 }
301
302 @SuppressWarnings("deprecation")
303 private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
304 final byte[] bytes, int count, WrapType wrapType)
305 throws Throwable {
306 Channel sc = null;
307 Channel cc = null;
308
309 try {
310 cb.handler(new SimpleChannelInboundHandler<Object>() {
311 @Override
312 public void channelRead0(ChannelHandlerContext ctx, Object msgs) {
313
314 }
315 });
316
317 final SocketAddress sender;
318 if (bindClient) {
319 cc = cb.bind(newSocketAddress()).sync().channel();
320 sender = cc.localAddress();
321 } else {
322 cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
323 cc = cb.register().sync().channel();
324 sender = null;
325 }
326
327 final CountDownLatch latch = new CountDownLatch(count);
328 AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
329 sc = setupServerChannel(sb, bytes, sender, latch, errorRef, false);
330
331 SocketAddress localAddr = sc.localAddress();
332 SocketAddress addr = localAddr instanceof InetSocketAddress ?
333 sendToAddress((InetSocketAddress) sc.localAddress()) : localAddr;
334 List<ChannelFuture> futures = new ArrayList<ChannelFuture>(count);
335 for (int i = 0; i < count; i++) {
336 futures.add(write(cc, buf, addr, wrapType));
337 }
338
339 cc.flush();
340
341 for (ChannelFuture future: futures) {
342 future.sync();
343 }
344 if (!latch.await(10, TimeUnit.SECONDS)) {
345 Throwable error = errorRef.get();
346 if (error != null) {
347 throw error;
348 }
349 fail();
350 }
351 } finally {
352
353 buf.release();
354
355 closeChannel(cc);
356 closeChannel(sc);
357 }
358 }
359
360 private void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count)
361 throws Throwable {
362 try {
363 for (WrapType type : WrapType.values()) {
364 testSimpleSendWithConnect0(sb, cb, buf.retain(), bytes, count, type);
365 }
366 } finally {
367 assertTrue(buf.release());
368 }
369 }
370
371 private void testSimpleSendWithConnect0(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count,
372 WrapType wrapType) throws Throwable {
373 Channel sc = null;
374 Channel cc = null;
375 try {
376 final CountDownLatch latch = new CountDownLatch(count);
377 final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
378 final CountDownLatch clientLatch = new CountDownLatch(count);
379 final AtomicReference<Throwable> clientErrorRef = new AtomicReference<Throwable>();
380 cc = setupClientChannel(cb, bytes, clientLatch, clientErrorRef);
381 sc = setupServerChannel(sb, bytes, cc.localAddress(), latch, errorRef, true);
382
383 SocketAddress localAddr = sc.localAddress();
384 SocketAddress addr = localAddr instanceof InetSocketAddress ?
385 sendToAddress((InetSocketAddress) sc.localAddress()) : localAddr;
386 cc.connect(addr).syncUninterruptibly();
387
388 List<ChannelFuture> futures = new ArrayList<ChannelFuture>();
389 for (int i = 0; i < count; i++) {
390 futures.add(write(cc, buf, wrapType));
391 }
392 cc.flush();
393
394 for (ChannelFuture future: futures) {
395 future.sync();
396 }
397
398 if (!latch.await(10, TimeUnit.SECONDS)) {
399 Throwable cause = errorRef.get();
400 if (cause != null) {
401 throw cause;
402 }
403 fail();
404 }
405 if (!clientLatch.await(10, TimeUnit.SECONDS)) {
406 Throwable cause = clientErrorRef.get();
407 if (cause != null) {
408 throw cause;
409 }
410 fail();
411 }
412 assertTrue(isConnected(cc));
413
414 assertNotNull(cc.localAddress());
415 assertNotNull(cc.remoteAddress());
416
417 if (supportDisconnect()) {
418 try {
419
420 cc.disconnect().syncUninterruptibly();
421 } catch (Throwable e) {
422 if (e instanceof SocketException) {
423 if (disconnectMightFail((DatagramChannel) cc)) {
424 return;
425 }
426 }
427 throw e;
428 }
429 assertFalse(isConnected(cc));
430 assertNotNull(cc.localAddress());
431 assertNull(cc.remoteAddress());
432
433 ChannelFuture future = cc.writeAndFlush(
434 buf.retain().duplicate()).awaitUninterruptibly();
435 assertTrue(future.cause() instanceof NotYetConnectedException,
436 "NotYetConnectedException expected, got: " + future.cause());
437 }
438 } finally {
439
440 buf.release();
441
442 closeChannel(cc);
443 closeChannel(sc);
444 }
445 }
446
447 private static ChannelFuture write(Channel cc, ByteBuf buf, WrapType wrapType) {
448 switch (wrapType) {
449 case DUP:
450 return cc.write(buf.retainedDuplicate());
451 case SLICE:
452 return cc.write(buf.retainedSlice());
453 case READ_ONLY:
454 return cc.write(buf.retain().asReadOnly());
455 case NONE:
456 return cc.write(buf.retain());
457 default:
458 throw new Error("unknown wrap type: " + wrapType);
459 }
460 }
461
462 protected abstract boolean isConnected(Channel channel);
463
464 protected abstract Channel setupClientChannel(Bootstrap cb, byte[] bytes, CountDownLatch latch,
465 AtomicReference<Throwable> errorRef) throws Throwable;
466
467 protected abstract Channel setupServerChannel(Bootstrap sb, byte[] bytes, SocketAddress sender,
468 CountDownLatch latch, AtomicReference<Throwable> errorRef,
469 boolean echo) throws Throwable;
470
471 protected abstract boolean supportDisconnect();
472
473 protected boolean disconnectMightFail(DatagramChannel channel) {
474 return false;
475 }
476
477 protected abstract ChannelFuture write(Channel cc, ByteBuf buf, SocketAddress remote, WrapType wrapType);
478
479 protected static void closeChannel(Channel channel) throws Exception {
480 if (channel != null) {
481 channel.close().sync();
482 }
483 }
484
485 protected InetSocketAddress sendToAddress(InetSocketAddress serverAddress) {
486 InetAddress addr = serverAddress.getAddress();
487 if (addr.isAnyLocalAddress()) {
488 if (addr instanceof Inet6Address) {
489 return new InetSocketAddress(NetUtil.LOCALHOST6, serverAddress.getPort());
490 }
491 return new InetSocketAddress(NetUtil.LOCALHOST4, serverAddress.getPort());
492 }
493 return serverAddress;
494 }
495 }