1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.CompositeByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelFuture;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelInboundHandlerAdapter;
26 import io.netty.channel.ChannelInitializer;
27 import io.netty.channel.ChannelOption;
28 import io.netty.channel.SimpleChannelInboundHandler;
29 import io.netty.channel.socket.DatagramChannel;
30 import io.netty.channel.socket.DatagramPacket;
31 import io.netty.util.CharsetUtil;
32 import io.netty.util.NetUtil;
33 import io.netty.util.internal.EmptyArrays;
34 import org.junit.jupiter.api.Test;
35 import org.junit.jupiter.api.TestInfo;
36 import org.opentest4j.TestAbortedException;
37
38 import java.net.BindException;
39 import java.net.DatagramSocket;
40 import java.net.Inet6Address;
41 import java.net.InetAddress;
42 import java.net.InetSocketAddress;
43 import java.net.SocketAddress;
44 import java.net.SocketException;
45 import java.nio.channels.NotYetConnectedException;
46 import java.nio.channels.UnresolvedAddressException;
47 import java.util.ArrayList;
48 import java.util.List;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.Semaphore;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.atomic.AtomicReference;
53
54 import static org.assertj.core.api.Assumptions.assumeThat;
55 import static org.junit.jupiter.api.Assertions.assertFalse;
56 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
57 import static org.junit.jupiter.api.Assertions.assertNotNull;
58 import static org.junit.jupiter.api.Assertions.assertNull;
59 import static org.junit.jupiter.api.Assertions.assertTrue;
60 import static org.junit.jupiter.api.Assertions.fail;
61
62 public abstract class DatagramUnicastTest extends AbstractDatagramTest {
63
64 private static final byte[] BYTES = {0, 1, 2, 3};
65 protected enum WrapType {
66 NONE, DUP, SLICE, READ_ONLY
67 }
68
69 @Test
70 public void testSimpleSendDirectByteBuf(TestInfo testInfo) throws Throwable {
71 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
72 @Override
73 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
74 testSimpleSendDirectByteBuf(bootstrap, bootstrap2);
75 }
76 });
77 }
78
79 public void testSimpleSendDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
80 testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 1);
81 testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 4);
82 }
83
84 @Test
85 public void testSimpleSendHeapByteBuf(TestInfo testInfo) throws Throwable {
86 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
87 @Override
88 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
89 testSimpleSendHeapByteBuf(bootstrap, bootstrap2);
90 }
91 });
92 }
93
94 public void testSimpleSendHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
95 testSimpleSend(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 1);
96 testSimpleSend(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 4);
97 }
98
99 @Test
100 public void testSimpleSendCompositeDirectByteBuf(TestInfo testInfo) throws Throwable {
101 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
102 @Override
103 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
104 testSimpleSendCompositeDirectByteBuf(bootstrap, bootstrap2);
105 }
106 });
107 }
108
109 public void testSimpleSendCompositeDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
110 CompositeByteBuf buf = Unpooled.compositeBuffer();
111 buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
112 buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
113 testSimpleSend(sb, cb, buf, true, BYTES, 1);
114
115 CompositeByteBuf buf2 = Unpooled.compositeBuffer();
116 buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
117 buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
118 testSimpleSend(sb, cb, buf2, true, BYTES, 4);
119 }
120
121 @Test
122 public void testSimpleSendCompositeHeapByteBuf(TestInfo testInfo) throws Throwable {
123 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
124 @Override
125 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
126 testSimpleSendCompositeHeapByteBuf(bootstrap, bootstrap2);
127 }
128 });
129 }
130
131 public void testSimpleSendCompositeHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
132 CompositeByteBuf buf = Unpooled.compositeBuffer();
133 buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
134 buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
135 testSimpleSend(sb, cb, buf, true, BYTES, 1);
136
137 CompositeByteBuf buf2 = Unpooled.compositeBuffer();
138 buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
139 buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
140 testSimpleSend(sb, cb, buf2, true, BYTES, 4);
141 }
142
143 @Test
144 public void testSimpleSendCompositeMixedByteBuf(TestInfo testInfo) throws Throwable {
145 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
146 @Override
147 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
148 testSimpleSendCompositeMixedByteBuf(bootstrap, bootstrap2);
149 }
150 });
151 }
152
153 public void testSimpleSendCompositeMixedByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
154 CompositeByteBuf buf = Unpooled.compositeBuffer();
155 buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
156 buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
157 testSimpleSend(sb, cb, buf, true, BYTES, 1);
158
159 CompositeByteBuf buf2 = Unpooled.compositeBuffer();
160 buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
161 buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
162 testSimpleSend(sb, cb, buf2, true, BYTES, 4);
163 }
164
165 @Test
166 public void testSimpleSendWithoutBind(TestInfo testInfo) throws Throwable {
167 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
168 @Override
169 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
170 testSimpleSendWithoutBind(bootstrap, bootstrap2);
171 }
172 });
173 }
174
175 public void testSimpleSendWithoutBind(Bootstrap sb, Bootstrap cb) throws Throwable {
176 testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 1);
177 testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 4);
178 }
179
180 private void testSimpleSend(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
181 final byte[] bytes, int count) throws Throwable {
182 try {
183 for (WrapType type: WrapType.values()) {
184 testSimpleSend0(sb, cb, buf.retain(), bindClient, bytes, count, type);
185 }
186 } finally {
187 assertTrue(buf.release());
188 }
189 }
190
191 @Test
192 public void testSimpleSendWithConnect(TestInfo testInfo) throws Throwable {
193 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
194 @Override
195 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
196 testSimpleSendWithConnect(bootstrap, bootstrap2);
197 }
198 });
199 }
200
201 public void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb) throws Throwable {
202 testSimpleSendWithConnect(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), BYTES, 1);
203 testSimpleSendWithConnect(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), BYTES, 4);
204 }
205
206 @Test
207 public void testReceiveEmptyDatagrams(TestInfo testInfo) throws Throwable {
208 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
209 @Override
210 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
211 testReceiveEmptyDatagrams(bootstrap, bootstrap2);
212 }
213 });
214 }
215
216 public void testReceiveEmptyDatagrams(Bootstrap sb, Bootstrap cb) throws Throwable {
217 final Semaphore semaphore = new Semaphore(0);
218 Channel server = sb.handler(new ChannelInitializer<Channel>() {
219 @Override
220 protected void initChannel(Channel ch) throws Exception {
221 ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
222 @Override
223 protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
224 semaphore.release();
225 }
226 });
227 }
228 }).bind(newSocketAddress()).sync().channel();
229
230 SocketAddress address = server.localAddress();
231 DatagramSocket client;
232 try {
233 client = new DatagramSocket(newSocketAddress());
234 } catch (IllegalArgumentException e) {
235 assumeThat(e.getMessage()).doesNotContainIgnoringCase("unsupported address type");
236 throw e;
237 }
238 SocketAddress sendAddress = address instanceof InetSocketAddress ?
239 sendToAddress((InetSocketAddress) address) : address;
240 for (int i = 0; i < 100; i++) {
241 try {
242 client.send(new java.net.DatagramPacket(EmptyArrays.EMPTY_BYTES, 0, sendAddress));
243 } catch (BindException e) {
244 throw new TestAbortedException("JDK sockets do not support binding to these addresses.", e);
245 }
246 semaphore.acquire();
247 }
248 }
249
250 @Test
251 public void testSendToUnresolvableAddress(TestInfo testInfo) throws Throwable {
252 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
253 @Override
254 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
255 testSendToUnresolvableAddress(bootstrap, bootstrap2);
256 }
257 });
258 }
259
260 public void testSendToUnresolvableAddress(Bootstrap sb, Bootstrap cb) throws Throwable {
261 SocketAddress serverAddress = newSocketAddress();
262 if (!(serverAddress instanceof InetSocketAddress)) {
263 return;
264 }
265 Channel sc = sb.handler(new ChannelInitializer<Channel>() {
266 @Override
267 protected void initChannel(Channel ch) throws Exception {
268 ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
269 @Override
270 protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
271
272 }
273 });
274 }
275 }).bind(serverAddress).sync().channel();
276
277 Channel cc = cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true).
278 handler(new ChannelInboundHandlerAdapter()).register().sync().channel();
279 try {
280 InetSocketAddress goodHost = sendToAddress((InetSocketAddress) sc.localAddress());
281 InetSocketAddress unresolvedHost = new InetSocketAddress("NOT_A_REAL_ADDRESS", goodHost.getPort());
282
283 assertFalse(goodHost.isUnresolved());
284 assertTrue(unresolvedHost.isUnresolved());
285
286 String message = "hello world!";
287 cc.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), goodHost)).sync();
288 assertInstanceOf(UnresolvedAddressException.class, cc.writeAndFlush(new DatagramPacket(
289 Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), unresolvedHost)).await().cause());
290
291
292 assertTrue(cc.isOpen());
293
294
295 cc.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), goodHost)).sync();
296 assertInstanceOf(UnresolvedAddressException.class, cc.writeAndFlush(new DatagramPacket(
297 Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), unresolvedHost)).await().cause());
298 assertTrue(cc.isOpen());
299 } finally {
300 closeChannel(cc);
301 closeChannel(sc);
302 }
303 }
304
305 @SuppressWarnings("deprecation")
306 private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
307 final byte[] bytes, int count, WrapType wrapType)
308 throws Throwable {
309 Channel sc = null;
310 Channel cc = null;
311
312 try {
313 cb.handler(new SimpleChannelInboundHandler<Object>() {
314 @Override
315 public void channelRead0(ChannelHandlerContext ctx, Object msgs) {
316
317 }
318 });
319
320 final SocketAddress sender;
321 if (bindClient) {
322 cc = cb.bind(newSocketAddress()).sync().channel();
323 sender = cc.localAddress();
324 } else {
325 cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
326 cc = cb.register().sync().channel();
327 sender = null;
328 }
329
330 final CountDownLatch latch = new CountDownLatch(count);
331 AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
332 sc = setupServerChannel(sb, bytes, sender, latch, errorRef, false);
333
334 SocketAddress localAddr = sc.localAddress();
335 SocketAddress addr = localAddr instanceof InetSocketAddress ?
336 sendToAddress((InetSocketAddress) sc.localAddress()) : localAddr;
337 List<ChannelFuture> futures = new ArrayList<ChannelFuture>(count);
338 for (int i = 0; i < count; i++) {
339 futures.add(write(cc, buf, addr, wrapType));
340 }
341
342 cc.flush();
343
344 for (ChannelFuture future: futures) {
345 future.sync();
346 }
347 if (!latch.await(10, TimeUnit.SECONDS)) {
348 Throwable error = errorRef.get();
349 if (error != null) {
350 throw error;
351 }
352 fail();
353 }
354 } finally {
355
356 buf.release();
357
358 closeChannel(cc);
359 closeChannel(sc);
360 }
361 }
362
363 private void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count)
364 throws Throwable {
365 try {
366 for (WrapType type : WrapType.values()) {
367 testSimpleSendWithConnect0(sb, cb, buf.retain(), bytes, count, type);
368 }
369 } finally {
370 assertTrue(buf.release());
371 }
372 }
373
374 private void testSimpleSendWithConnect0(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count,
375 WrapType wrapType) throws Throwable {
376 Channel sc = null;
377 Channel cc = null;
378 try {
379 final CountDownLatch latch = new CountDownLatch(count);
380 final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
381 final CountDownLatch clientLatch = new CountDownLatch(count);
382 final AtomicReference<Throwable> clientErrorRef = new AtomicReference<Throwable>();
383 cc = setupClientChannel(cb, bytes, clientLatch, clientErrorRef);
384 sc = setupServerChannel(sb, bytes, cc.localAddress(), latch, errorRef, true);
385
386 SocketAddress localAddr = sc.localAddress();
387 SocketAddress addr = localAddr instanceof InetSocketAddress ?
388 sendToAddress((InetSocketAddress) sc.localAddress()) : localAddr;
389 cc.connect(addr).syncUninterruptibly();
390
391 List<ChannelFuture> futures = new ArrayList<ChannelFuture>();
392 for (int i = 0; i < count; i++) {
393 futures.add(write(cc, buf, wrapType));
394 }
395 cc.flush();
396
397 for (ChannelFuture future: futures) {
398 future.sync();
399 }
400
401 if (!latch.await(10, TimeUnit.SECONDS)) {
402 Throwable cause = errorRef.get();
403 if (cause != null) {
404 throw cause;
405 }
406 fail();
407 }
408 if (!clientLatch.await(10, TimeUnit.SECONDS)) {
409 Throwable cause = clientErrorRef.get();
410 if (cause != null) {
411 throw cause;
412 }
413 fail();
414 }
415 assertTrue(isConnected(cc));
416
417 assertNotNull(cc.localAddress());
418 assertNotNull(cc.remoteAddress());
419
420 if (supportDisconnect()) {
421 try {
422
423 cc.disconnect().syncUninterruptibly();
424 } catch (Throwable e) {
425 if (e instanceof SocketException) {
426 if (disconnectMightFail((DatagramChannel) cc)) {
427 return;
428 }
429 }
430 throw e;
431 }
432 assertFalse(isConnected(cc));
433 assertNotNull(cc.localAddress());
434 assertNull(cc.remoteAddress());
435
436 ChannelFuture future = cc.writeAndFlush(
437 buf.retain().duplicate()).awaitUninterruptibly();
438 assertTrue(future.cause() instanceof NotYetConnectedException,
439 "NotYetConnectedException expected, got: " + future.cause());
440 }
441 } finally {
442
443 buf.release();
444
445 closeChannel(cc);
446 closeChannel(sc);
447 }
448 }
449
450 private static ChannelFuture write(Channel cc, ByteBuf buf, WrapType wrapType) {
451 switch (wrapType) {
452 case DUP:
453 return cc.write(buf.retainedDuplicate());
454 case SLICE:
455 return cc.write(buf.retainedSlice());
456 case READ_ONLY:
457 return cc.write(buf.retain().asReadOnly());
458 case NONE:
459 return cc.write(buf.retain());
460 default:
461 throw new Error("unknown wrap type: " + wrapType);
462 }
463 }
464
465 protected abstract boolean isConnected(Channel channel);
466
467 protected abstract Channel setupClientChannel(Bootstrap cb, byte[] bytes, CountDownLatch latch,
468 AtomicReference<Throwable> errorRef) throws Throwable;
469
470 protected abstract Channel setupServerChannel(Bootstrap sb, byte[] bytes, SocketAddress sender,
471 CountDownLatch latch, AtomicReference<Throwable> errorRef,
472 boolean echo) throws Throwable;
473
474 protected abstract boolean supportDisconnect();
475
476 protected boolean disconnectMightFail(DatagramChannel channel) {
477 return false;
478 }
479
480 protected abstract ChannelFuture write(Channel cc, ByteBuf buf, SocketAddress remote, WrapType wrapType);
481
482 protected static void closeChannel(Channel channel) throws Exception {
483 if (channel != null) {
484 channel.close().sync();
485 }
486 }
487
488 protected InetSocketAddress sendToAddress(InetSocketAddress serverAddress) {
489 InetAddress addr = serverAddress.getAddress();
490 if (addr.isAnyLocalAddress()) {
491 if (addr instanceof Inet6Address) {
492 return new InetSocketAddress(NetUtil.LOCALHOST6, serverAddress.getPort());
493 }
494 return new InetSocketAddress(NetUtil.LOCALHOST4, serverAddress.getPort());
495 }
496 return serverAddress;
497 }
498 }