View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.CompositeByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelFuture;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelInboundHandlerAdapter;
26  import io.netty.channel.ChannelInitializer;
27  import io.netty.channel.ChannelOption;
28  import io.netty.channel.SimpleChannelInboundHandler;
29  import io.netty.channel.socket.DatagramChannel;
30  import io.netty.channel.socket.DatagramPacket;
31  import io.netty.util.CharsetUtil;
32  import io.netty.util.NetUtil;
33  import io.netty.util.internal.EmptyArrays;
34  import org.junit.jupiter.api.Test;
35  import org.junit.jupiter.api.TestInfo;
36  import org.opentest4j.TestAbortedException;
37  
38  import java.net.BindException;
39  import java.net.DatagramSocket;
40  import java.net.Inet6Address;
41  import java.net.InetAddress;
42  import java.net.InetSocketAddress;
43  import java.net.SocketAddress;
44  import java.net.SocketException;
45  import java.nio.channels.NotYetConnectedException;
46  import java.nio.channels.UnresolvedAddressException;
47  import java.util.ArrayList;
48  import java.util.List;
49  import java.util.concurrent.CountDownLatch;
50  import java.util.concurrent.Semaphore;
51  import java.util.concurrent.TimeUnit;
52  import java.util.concurrent.atomic.AtomicReference;
53  
54  import static org.assertj.core.api.Assumptions.assumeThat;
55  import static org.junit.jupiter.api.Assertions.assertFalse;
56  import static org.junit.jupiter.api.Assertions.assertInstanceOf;
57  import static org.junit.jupiter.api.Assertions.assertNotNull;
58  import static org.junit.jupiter.api.Assertions.assertNull;
59  import static org.junit.jupiter.api.Assertions.assertTrue;
60  import static org.junit.jupiter.api.Assertions.fail;
61  
62  public abstract class DatagramUnicastTest extends AbstractDatagramTest {
63  
64      private static final byte[] BYTES = {0, 1, 2, 3};
65      protected enum WrapType {
66          NONE, DUP, SLICE, READ_ONLY
67      }
68  
69      @Test
70      public void testSimpleSendDirectByteBuf(TestInfo testInfo) throws Throwable {
71          run(testInfo, new Runner<Bootstrap, Bootstrap>() {
72              @Override
73              public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
74                  testSimpleSendDirectByteBuf(bootstrap, bootstrap2);
75              }
76          });
77      }
78  
79      public void testSimpleSendDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
80          testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 1);
81          testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 4);
82      }
83  
84      @Test
85      public void testSimpleSendHeapByteBuf(TestInfo testInfo) throws Throwable {
86          run(testInfo, new Runner<Bootstrap, Bootstrap>() {
87              @Override
88              public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
89                  testSimpleSendHeapByteBuf(bootstrap, bootstrap2);
90              }
91          });
92      }
93  
94      public void testSimpleSendHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
95          testSimpleSend(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 1);
96          testSimpleSend(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 4);
97      }
98  
99      @Test
100     public void testSimpleSendCompositeDirectByteBuf(TestInfo testInfo) throws Throwable {
101         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
102             @Override
103             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
104                 testSimpleSendCompositeDirectByteBuf(bootstrap, bootstrap2);
105             }
106         });
107     }
108 
109     public void testSimpleSendCompositeDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
110         CompositeByteBuf buf = Unpooled.compositeBuffer();
111         buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
112         buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
113         testSimpleSend(sb, cb, buf, true, BYTES, 1);
114 
115         CompositeByteBuf buf2 = Unpooled.compositeBuffer();
116         buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
117         buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
118         testSimpleSend(sb, cb, buf2, true, BYTES, 4);
119     }
120 
121     @Test
122     public void testSimpleSendCompositeHeapByteBuf(TestInfo testInfo) throws Throwable {
123         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
124             @Override
125             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
126                 testSimpleSendCompositeHeapByteBuf(bootstrap, bootstrap2);
127             }
128         });
129     }
130 
131     public void testSimpleSendCompositeHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
132         CompositeByteBuf buf = Unpooled.compositeBuffer();
133         buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
134         buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
135         testSimpleSend(sb, cb, buf, true, BYTES, 1);
136 
137         CompositeByteBuf buf2 = Unpooled.compositeBuffer();
138         buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
139         buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
140         testSimpleSend(sb, cb, buf2, true, BYTES, 4);
141     }
142 
143     @Test
144     public void testSimpleSendCompositeMixedByteBuf(TestInfo testInfo) throws Throwable {
145         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
146             @Override
147             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
148                 testSimpleSendCompositeMixedByteBuf(bootstrap, bootstrap2);
149             }
150         });
151     }
152 
153     public void testSimpleSendCompositeMixedByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
154         CompositeByteBuf buf = Unpooled.compositeBuffer();
155         buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
156         buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
157         testSimpleSend(sb, cb, buf, true, BYTES, 1);
158 
159         CompositeByteBuf buf2 = Unpooled.compositeBuffer();
160         buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
161         buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
162         testSimpleSend(sb, cb, buf2, true, BYTES, 4);
163     }
164 
165     @Test
166     public void testSimpleSendWithoutBind(TestInfo testInfo) throws Throwable {
167         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
168             @Override
169             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
170                 testSimpleSendWithoutBind(bootstrap, bootstrap2);
171             }
172         });
173     }
174 
175     public void testSimpleSendWithoutBind(Bootstrap sb, Bootstrap cb) throws Throwable {
176         testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 1);
177         testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 4);
178     }
179 
180     private void testSimpleSend(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
181                                 final byte[] bytes, int count) throws Throwable {
182         try {
183             for (WrapType type: WrapType.values()) {
184                 testSimpleSend0(sb, cb, buf.retain(), bindClient, bytes, count, type);
185             }
186         } finally {
187             assertTrue(buf.release());
188         }
189     }
190 
191     @Test
192     public void testSimpleSendWithConnect(TestInfo testInfo) throws Throwable {
193         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
194             @Override
195             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
196                 testSimpleSendWithConnect(bootstrap, bootstrap2);
197             }
198         });
199     }
200 
201     public void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb) throws Throwable {
202         testSimpleSendWithConnect(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), BYTES, 1);
203         testSimpleSendWithConnect(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), BYTES, 4);
204     }
205 
206     @Test
207     public void testReceiveEmptyDatagrams(TestInfo testInfo) throws Throwable {
208         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
209             @Override
210             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
211                 testReceiveEmptyDatagrams(bootstrap, bootstrap2);
212             }
213         });
214     }
215 
216     public void testReceiveEmptyDatagrams(Bootstrap sb, Bootstrap cb) throws Throwable {
217         final Semaphore semaphore = new Semaphore(0);
218         Channel server = sb.handler(new ChannelInitializer<Channel>() {
219             @Override
220             protected void initChannel(Channel ch) throws Exception {
221                 ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
222                     @Override
223                     protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
224                         semaphore.release();
225                     }
226                 });
227             }
228         }).bind(newSocketAddress()).sync().channel();
229 
230         SocketAddress address = server.localAddress();
231         DatagramSocket client;
232         try {
233             client = new DatagramSocket(newSocketAddress());
234         } catch (IllegalArgumentException e) {
235             assumeThat(e.getMessage()).doesNotContainIgnoringCase("unsupported address type");
236             throw e;
237         }
238         SocketAddress sendAddress = address instanceof InetSocketAddress ?
239                 sendToAddress((InetSocketAddress) address) : address;
240         for (int i = 0; i < 100; i++) {
241             try {
242                 client.send(new java.net.DatagramPacket(EmptyArrays.EMPTY_BYTES, 0, sendAddress));
243             } catch (BindException e) {
244                 throw new TestAbortedException("JDK sockets do not support binding to these addresses.", e);
245             }
246             semaphore.acquire();
247         }
248     }
249 
250     @Test
251     public void testSendToUnresolvableAddress(TestInfo testInfo) throws Throwable {
252         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
253             @Override
254             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
255                 testSendToUnresolvableAddress(bootstrap, bootstrap2);
256             }
257         });
258     }
259 
260     public void testSendToUnresolvableAddress(Bootstrap sb, Bootstrap cb) throws Throwable {
261         SocketAddress serverAddress = newSocketAddress();
262         if (!(serverAddress instanceof InetSocketAddress)) {
263             return;
264         }
265         Channel sc = sb.handler(new ChannelInitializer<Channel>() {
266             @Override
267             protected void initChannel(Channel ch) throws Exception {
268                 ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
269                     @Override
270                     protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
271                         // Just drop
272                     }
273                 });
274             }
275         }).bind(serverAddress).sync().channel();
276 
277         Channel cc = cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true).
278                 handler(new ChannelInboundHandlerAdapter()).register().sync().channel();
279         try {
280             InetSocketAddress goodHost = sendToAddress((InetSocketAddress) sc.localAddress());
281             InetSocketAddress unresolvedHost = new InetSocketAddress("NOT_A_REAL_ADDRESS", goodHost.getPort());
282 
283             assertFalse(goodHost.isUnresolved());
284             assertTrue(unresolvedHost.isUnresolved());
285 
286             String message = "hello world!";
287             cc.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), goodHost)).sync();
288             assertInstanceOf(UnresolvedAddressException.class, cc.writeAndFlush(new DatagramPacket(
289                     Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), unresolvedHost)).await().cause());
290 
291             // DatagramChannel should still be open after sending to unresolved address
292             assertTrue(cc.isOpen());
293 
294             // DatagramChannel should still be able to send messages outbound
295             cc.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), goodHost)).sync();
296             assertInstanceOf(UnresolvedAddressException.class, cc.writeAndFlush(new DatagramPacket(
297                     Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), unresolvedHost)).await().cause());
298             assertTrue(cc.isOpen());
299         } finally {
300             closeChannel(cc);
301             closeChannel(sc);
302         }
303     }
304 
305     @SuppressWarnings("deprecation")
306     private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
307                                 final byte[] bytes, int count, WrapType wrapType)
308             throws Throwable {
309         Channel sc = null;
310         Channel cc = null;
311 
312         try {
313             cb.handler(new SimpleChannelInboundHandler<Object>() {
314                 @Override
315                 public void channelRead0(ChannelHandlerContext ctx, Object msgs) {
316                     // Nothing will be sent.
317                 }
318             });
319 
320             final SocketAddress sender;
321             if (bindClient) {
322                 cc = cb.bind(newSocketAddress()).sync().channel();
323                 sender = cc.localAddress();
324             } else {
325                 cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
326                 cc = cb.register().sync().channel();
327                 sender = null;
328             }
329 
330             final CountDownLatch latch = new CountDownLatch(count);
331             AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
332             sc = setupServerChannel(sb, bytes, sender, latch, errorRef, false);
333 
334             SocketAddress localAddr = sc.localAddress();
335             SocketAddress addr = localAddr instanceof InetSocketAddress ?
336                     sendToAddress((InetSocketAddress) sc.localAddress()) : localAddr;
337             List<ChannelFuture> futures = new ArrayList<ChannelFuture>(count);
338             for (int i = 0; i < count; i++) {
339                 futures.add(write(cc, buf, addr, wrapType));
340             }
341             // release as we used buf.retain() before
342             cc.flush();
343 
344             for (ChannelFuture future: futures) {
345                 future.sync();
346             }
347             if (!latch.await(10, TimeUnit.SECONDS)) {
348                 Throwable error = errorRef.get();
349                 if (error != null) {
350                     throw error;
351                 }
352                 fail();
353             }
354         } finally {
355             // release as we used buf.retain() before
356             buf.release();
357 
358             closeChannel(cc);
359             closeChannel(sc);
360         }
361     }
362 
363     private void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count)
364             throws Throwable {
365         try {
366             for (WrapType type : WrapType.values()) {
367                 testSimpleSendWithConnect0(sb, cb, buf.retain(), bytes, count, type);
368             }
369         } finally {
370             assertTrue(buf.release());
371         }
372     }
373 
374     private void testSimpleSendWithConnect0(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count,
375                                             WrapType wrapType) throws Throwable {
376         Channel sc = null;
377         Channel cc = null;
378         try {
379             final CountDownLatch latch = new CountDownLatch(count);
380             final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
381             final CountDownLatch clientLatch = new CountDownLatch(count);
382             final AtomicReference<Throwable> clientErrorRef = new AtomicReference<Throwable>();
383             cc = setupClientChannel(cb, bytes, clientLatch, clientErrorRef);
384             sc = setupServerChannel(sb, bytes, cc.localAddress(), latch, errorRef, true);
385 
386             SocketAddress localAddr = sc.localAddress();
387             SocketAddress addr = localAddr instanceof InetSocketAddress ?
388                     sendToAddress((InetSocketAddress) sc.localAddress()) : localAddr;
389             cc.connect(addr).syncUninterruptibly();
390 
391             List<ChannelFuture> futures = new ArrayList<ChannelFuture>();
392             for (int i = 0; i < count; i++) {
393                 futures.add(write(cc, buf, wrapType));
394             }
395             cc.flush();
396 
397             for (ChannelFuture future: futures) {
398                 future.sync();
399             }
400 
401             if (!latch.await(10, TimeUnit.SECONDS)) {
402                 Throwable cause = errorRef.get();
403                 if (cause != null) {
404                     throw cause;
405                 }
406                 fail();
407             }
408             if (!clientLatch.await(10, TimeUnit.SECONDS)) {
409                 Throwable cause = clientErrorRef.get();
410                 if (cause != null) {
411                     throw cause;
412                 }
413                 fail();
414             }
415             assertTrue(isConnected(cc));
416 
417             assertNotNull(cc.localAddress());
418             assertNotNull(cc.remoteAddress());
419 
420             if (supportDisconnect()) {
421                 try {
422                     // Test what happens when we call disconnect()
423                     cc.disconnect().syncUninterruptibly();
424                 } catch (Throwable e) {
425                     if (e instanceof SocketException) {
426                         if (disconnectMightFail((DatagramChannel) cc)) {
427                             return;
428                         }
429                     }
430                     throw e;
431                 }
432                 assertFalse(isConnected(cc));
433                 assertNotNull(cc.localAddress());
434                 assertNull(cc.remoteAddress());
435 
436                 ChannelFuture future = cc.writeAndFlush(
437                         buf.retain().duplicate()).awaitUninterruptibly();
438                 assertTrue(future.cause() instanceof NotYetConnectedException,
439                         "NotYetConnectedException expected, got: " + future.cause());
440             }
441         } finally {
442             // release as we used buf.retain() before
443             buf.release();
444 
445             closeChannel(cc);
446             closeChannel(sc);
447         }
448     }
449 
450     private static ChannelFuture write(Channel cc, ByteBuf buf, WrapType wrapType) {
451         switch (wrapType) {
452             case DUP:
453                 return cc.write(buf.retainedDuplicate());
454             case SLICE:
455                 return cc.write(buf.retainedSlice());
456             case READ_ONLY:
457                 return cc.write(buf.retain().asReadOnly());
458             case NONE:
459                 return cc.write(buf.retain());
460             default:
461                 throw new Error("unknown wrap type: " + wrapType);
462         }
463     }
464 
465     protected abstract boolean isConnected(Channel channel);
466 
467     protected abstract Channel setupClientChannel(Bootstrap cb, byte[] bytes, CountDownLatch latch,
468                                                   AtomicReference<Throwable> errorRef) throws Throwable;
469 
470     protected abstract Channel setupServerChannel(Bootstrap sb, byte[] bytes, SocketAddress sender,
471                                                   CountDownLatch latch, AtomicReference<Throwable> errorRef,
472                                                   boolean echo) throws Throwable;
473 
474     protected abstract boolean supportDisconnect();
475 
476     protected boolean disconnectMightFail(DatagramChannel channel) {
477         return false;
478     }
479 
480     protected abstract ChannelFuture write(Channel cc, ByteBuf buf, SocketAddress remote, WrapType wrapType);
481 
482     protected static void closeChannel(Channel channel) throws Exception {
483         if (channel != null) {
484             channel.close().sync();
485         }
486     }
487 
488     protected InetSocketAddress sendToAddress(InetSocketAddress serverAddress) {
489         InetAddress addr = serverAddress.getAddress();
490         if (addr.isAnyLocalAddress()) {
491             if (addr instanceof Inet6Address) {
492                 return new InetSocketAddress(NetUtil.LOCALHOST6, serverAddress.getPort());
493             }
494             return new InetSocketAddress(NetUtil.LOCALHOST4, serverAddress.getPort());
495         }
496         return serverAddress;
497     }
498 }