View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.CompositeByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelFuture;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelInboundHandlerAdapter;
26  import io.netty.channel.ChannelInitializer;
27  import io.netty.channel.ChannelOption;
28  import io.netty.channel.SimpleChannelInboundHandler;
29  import io.netty.channel.socket.DatagramChannel;
30  import io.netty.channel.socket.DatagramPacket;
31  import io.netty.util.CharsetUtil;
32  import io.netty.util.NetUtil;
33  import io.netty.util.internal.EmptyArrays;
34  import org.junit.jupiter.api.Test;
35  import org.junit.jupiter.api.TestInfo;
36  import org.opentest4j.TestAbortedException;
37  
38  import java.net.BindException;
39  import java.net.DatagramSocket;
40  import java.net.Inet6Address;
41  import java.net.InetAddress;
42  import java.net.InetSocketAddress;
43  import java.net.SocketAddress;
44  import java.net.SocketException;
45  import java.nio.channels.NotYetConnectedException;
46  import java.nio.channels.UnresolvedAddressException;
47  import java.util.ArrayList;
48  import java.util.List;
49  import java.util.concurrent.CountDownLatch;
50  import java.util.concurrent.Semaphore;
51  import java.util.concurrent.TimeUnit;
52  import java.util.concurrent.atomic.AtomicReference;
53  
54  import static org.assertj.core.api.Assumptions.assumeThat;
55  import static org.junit.jupiter.api.Assertions.assertFalse;
56  import static org.junit.jupiter.api.Assertions.assertInstanceOf;
57  import static org.junit.jupiter.api.Assertions.assertNotNull;
58  import static org.junit.jupiter.api.Assertions.assertNull;
59  import static org.junit.jupiter.api.Assertions.assertTrue;
60  import static org.junit.jupiter.api.Assertions.fail;
61  
62  public abstract class DatagramUnicastTest extends AbstractDatagramTest {
63  
64      private static final byte[] BYTES = {0, 1, 2, 3};
65      protected enum WrapType {
66          NONE, DUP, SLICE, READ_ONLY
67      }
68  
69      @Test
70      public void testSimpleSendDirectByteBuf(TestInfo testInfo) throws Throwable {
71          run(testInfo, new Runner<Bootstrap, Bootstrap>() {
72              @Override
73              public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
74                  testSimpleSendDirectByteBuf(bootstrap, bootstrap2);
75              }
76          });
77      }
78  
79      public void testSimpleSendDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
80          testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 1);
81          testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 4);
82      }
83  
84      @Test
85      public void testSimpleSendHeapByteBuf(TestInfo testInfo) throws Throwable {
86          run(testInfo, new Runner<Bootstrap, Bootstrap>() {
87              @Override
88              public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
89                  testSimpleSendHeapByteBuf(bootstrap, bootstrap2);
90              }
91          });
92      }
93  
94      public void testSimpleSendHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
95          testSimpleSend(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 1);
96          testSimpleSend(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 4);
97      }
98  
99      @Test
100     public void testSimpleSendCompositeDirectByteBuf(TestInfo testInfo) throws Throwable {
101         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
102             @Override
103             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
104                 testSimpleSendCompositeDirectByteBuf(bootstrap, bootstrap2);
105             }
106         });
107     }
108 
109     public void testSimpleSendCompositeDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
110         CompositeByteBuf buf = Unpooled.compositeBuffer();
111         buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
112         buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
113         testSimpleSend(sb, cb, buf, true, BYTES, 1);
114 
115         CompositeByteBuf buf2 = Unpooled.compositeBuffer();
116         buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
117         buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
118         testSimpleSend(sb, cb, buf2, true, BYTES, 4);
119     }
120 
121     @Test
122     public void testSimpleSendCompositeHeapByteBuf(TestInfo testInfo) throws Throwable {
123         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
124             @Override
125             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
126                 testSimpleSendCompositeHeapByteBuf(bootstrap, bootstrap2);
127             }
128         });
129     }
130 
131     public void testSimpleSendCompositeHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
132         CompositeByteBuf buf = Unpooled.compositeBuffer();
133         buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
134         buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
135         testSimpleSend(sb, cb, buf, true, BYTES, 1);
136 
137         CompositeByteBuf buf2 = Unpooled.compositeBuffer();
138         buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
139         buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
140         testSimpleSend(sb, cb, buf2, true, BYTES, 4);
141     }
142 
143     @Test
144     public void testSimpleSendCompositeMixedByteBuf(TestInfo testInfo) throws Throwable {
145         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
146             @Override
147             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
148                 testSimpleSendCompositeMixedByteBuf(bootstrap, bootstrap2);
149             }
150         });
151     }
152 
153     public void testSimpleSendCompositeMixedByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
154         CompositeByteBuf buf = Unpooled.compositeBuffer();
155         buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
156         buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
157         testSimpleSend(sb, cb, buf, true, BYTES, 1);
158 
159         CompositeByteBuf buf2 = Unpooled.compositeBuffer();
160         buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
161         buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
162         testSimpleSend(sb, cb, buf2, true, BYTES, 4);
163     }
164 
165     @Test
166     public void testSimpleSendWithoutBind(TestInfo testInfo) throws Throwable {
167         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
168             @Override
169             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
170                 testSimpleSendWithoutBind(bootstrap, bootstrap2);
171             }
172         });
173     }
174 
175     public void testSimpleSendWithoutBind(Bootstrap sb, Bootstrap cb) throws Throwable {
176         testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 1);
177         testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 4);
178     }
179 
180     private void testSimpleSend(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
181                                 final byte[] bytes, int count) throws Throwable {
182         for (WrapType type: WrapType.values()) {
183             testSimpleSend0(sb, cb, buf.retain(), bindClient, bytes, count, type);
184         }
185         assertTrue(buf.release());
186     }
187 
188     @Test
189     public void testSimpleSendWithConnect(TestInfo testInfo) throws Throwable {
190         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
191             @Override
192             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
193                 testSimpleSendWithConnect(bootstrap, bootstrap2);
194             }
195         });
196     }
197 
198     public void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb) throws Throwable {
199         testSimpleSendWithConnect(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), BYTES, 1);
200         testSimpleSendWithConnect(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), BYTES, 4);
201     }
202 
203     @Test
204     public void testReceiveEmptyDatagrams(TestInfo testInfo) throws Throwable {
205         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
206             @Override
207             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
208                 testReceiveEmptyDatagrams(bootstrap, bootstrap2);
209             }
210         });
211     }
212 
213     public void testReceiveEmptyDatagrams(Bootstrap sb, Bootstrap cb) throws Throwable {
214         final Semaphore semaphore = new Semaphore(0);
215         Channel server = sb.handler(new ChannelInitializer<Channel>() {
216             @Override
217             protected void initChannel(Channel ch) throws Exception {
218                 ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
219                     @Override
220                     protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
221                         semaphore.release();
222                     }
223                 });
224             }
225         }).bind(newSocketAddress()).sync().channel();
226 
227         SocketAddress address = server.localAddress();
228         DatagramSocket client;
229         try {
230             client = new DatagramSocket(newSocketAddress());
231         } catch (IllegalArgumentException e) {
232             assumeThat(e.getMessage()).doesNotContainIgnoringCase("unsupported address type");
233             throw e;
234         }
235         SocketAddress sendAddress = address instanceof InetSocketAddress ?
236                 sendToAddress((InetSocketAddress) address) : address;
237         for (int i = 0; i < 100; i++) {
238             try {
239                 client.send(new java.net.DatagramPacket(EmptyArrays.EMPTY_BYTES, 0, sendAddress));
240             } catch (BindException e) {
241                 throw new TestAbortedException("JDK sockets do not support binding to these addresses.", e);
242             }
243             semaphore.acquire();
244         }
245     }
246 
247     @Test
248     public void testSendToUnresolvableAddress(TestInfo testInfo) throws Throwable {
249         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
250             @Override
251             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
252                 testSendToUnresolvableAddress(bootstrap, bootstrap2);
253             }
254         });
255     }
256 
257     public void testSendToUnresolvableAddress(Bootstrap sb, Bootstrap cb) throws Throwable {
258         SocketAddress serverAddress = newSocketAddress();
259         if (!(serverAddress instanceof InetSocketAddress)) {
260             return;
261         }
262         Channel sc = sb.handler(new ChannelInitializer<Channel>() {
263             @Override
264             protected void initChannel(Channel ch) throws Exception {
265                 ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
266                     @Override
267                     protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
268                         // Just drop
269                     }
270                 });
271             }
272         }).bind(serverAddress).sync().channel();
273 
274         Channel cc = cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true).
275                 handler(new ChannelInboundHandlerAdapter()).register().sync().channel();
276         try {
277             InetSocketAddress goodHost = sendToAddress((InetSocketAddress) sc.localAddress());
278             InetSocketAddress unresolvedHost = new InetSocketAddress("NOT_A_REAL_ADDRESS", goodHost.getPort());
279 
280             assertFalse(goodHost.isUnresolved());
281             assertTrue(unresolvedHost.isUnresolved());
282 
283             String message = "hello world!";
284             cc.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), goodHost)).sync();
285             assertInstanceOf(UnresolvedAddressException.class, cc.writeAndFlush(new DatagramPacket(
286                     Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), unresolvedHost)).await().cause());
287 
288             // DatagramChannel should still be open after sending to unresolved address
289             assertTrue(cc.isOpen());
290 
291             // DatagramChannel should still be able to send messages outbound
292             cc.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), goodHost)).sync();
293             assertInstanceOf(UnresolvedAddressException.class, cc.writeAndFlush(new DatagramPacket(
294                     Unpooled.copiedBuffer(message, CharsetUtil.US_ASCII), unresolvedHost)).await().cause());
295             assertTrue(cc.isOpen());
296         } finally {
297             closeChannel(cc);
298             closeChannel(sc);
299         }
300     }
301 
302     @SuppressWarnings("deprecation")
303     private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
304                                 final byte[] bytes, int count, WrapType wrapType)
305             throws Throwable {
306         Channel sc = null;
307         Channel cc = null;
308 
309         try {
310             cb.handler(new SimpleChannelInboundHandler<Object>() {
311                 @Override
312                 public void channelRead0(ChannelHandlerContext ctx, Object msgs) {
313                     // Nothing will be sent.
314                 }
315             });
316 
317             final SocketAddress sender;
318             if (bindClient) {
319                 cc = cb.bind(newSocketAddress()).sync().channel();
320                 sender = cc.localAddress();
321             } else {
322                 cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
323                 cc = cb.register().sync().channel();
324                 sender = null;
325             }
326 
327             final CountDownLatch latch = new CountDownLatch(count);
328             AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
329             sc = setupServerChannel(sb, bytes, sender, latch, errorRef, false);
330 
331             SocketAddress localAddr = sc.localAddress();
332             SocketAddress addr = localAddr instanceof InetSocketAddress ?
333                     sendToAddress((InetSocketAddress) sc.localAddress()) : localAddr;
334             List<ChannelFuture> futures = new ArrayList<ChannelFuture>(count);
335             for (int i = 0; i < count; i++) {
336                 futures.add(write(cc, buf, addr, wrapType));
337             }
338             // release as we used buf.retain() before
339             cc.flush();
340 
341             for (ChannelFuture future: futures) {
342                 future.sync();
343             }
344             if (!latch.await(10, TimeUnit.SECONDS)) {
345                 Throwable error = errorRef.get();
346                 if (error != null) {
347                     throw error;
348                 }
349                 fail();
350             }
351         } finally {
352             // release as we used buf.retain() before
353             buf.release();
354 
355             closeChannel(cc);
356             closeChannel(sc);
357         }
358     }
359 
360     private void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count)
361             throws Throwable {
362         try {
363             for (WrapType type : WrapType.values()) {
364                 testSimpleSendWithConnect0(sb, cb, buf.retain(), bytes, count, type);
365             }
366         } finally {
367             assertTrue(buf.release());
368         }
369     }
370 
371     private void testSimpleSendWithConnect0(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count,
372                                             WrapType wrapType) throws Throwable {
373         Channel sc = null;
374         Channel cc = null;
375         try {
376             final CountDownLatch latch = new CountDownLatch(count);
377             final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
378             final CountDownLatch clientLatch = new CountDownLatch(count);
379             final AtomicReference<Throwable> clientErrorRef = new AtomicReference<Throwable>();
380             cc = setupClientChannel(cb, bytes, clientLatch, clientErrorRef);
381             sc = setupServerChannel(sb, bytes, cc.localAddress(), latch, errorRef, true);
382 
383             SocketAddress localAddr = sc.localAddress();
384             SocketAddress addr = localAddr instanceof InetSocketAddress ?
385                     sendToAddress((InetSocketAddress) sc.localAddress()) : localAddr;
386             cc.connect(addr).syncUninterruptibly();
387 
388             List<ChannelFuture> futures = new ArrayList<ChannelFuture>();
389             for (int i = 0; i < count; i++) {
390                 futures.add(write(cc, buf, wrapType));
391             }
392             cc.flush();
393 
394             for (ChannelFuture future: futures) {
395                 future.sync();
396             }
397 
398             if (!latch.await(10, TimeUnit.SECONDS)) {
399                 Throwable cause = errorRef.get();
400                 if (cause != null) {
401                     throw cause;
402                 }
403                 fail();
404             }
405             if (!clientLatch.await(10, TimeUnit.SECONDS)) {
406                 Throwable cause = clientErrorRef.get();
407                 if (cause != null) {
408                     throw cause;
409                 }
410                 fail();
411             }
412             assertTrue(isConnected(cc));
413 
414             assertNotNull(cc.localAddress());
415             assertNotNull(cc.remoteAddress());
416 
417             if (supportDisconnect()) {
418                 try {
419                     // Test what happens when we call disconnect()
420                     cc.disconnect().syncUninterruptibly();
421                 } catch (Throwable e) {
422                     if (e instanceof SocketException) {
423                         if (disconnectMightFail((DatagramChannel) cc)) {
424                             return;
425                         }
426                     }
427                     throw e;
428                 }
429                 assertFalse(isConnected(cc));
430                 assertNotNull(cc.localAddress());
431                 assertNull(cc.remoteAddress());
432 
433                 ChannelFuture future = cc.writeAndFlush(
434                         buf.retain().duplicate()).awaitUninterruptibly();
435                 assertTrue(future.cause() instanceof NotYetConnectedException,
436                         "NotYetConnectedException expected, got: " + future.cause());
437             }
438         } finally {
439             // release as we used buf.retain() before
440             buf.release();
441 
442             closeChannel(cc);
443             closeChannel(sc);
444         }
445     }
446 
447     private static ChannelFuture write(Channel cc, ByteBuf buf, WrapType wrapType) {
448         switch (wrapType) {
449             case DUP:
450                 return cc.write(buf.retainedDuplicate());
451             case SLICE:
452                 return cc.write(buf.retainedSlice());
453             case READ_ONLY:
454                 return cc.write(buf.retain().asReadOnly());
455             case NONE:
456                 return cc.write(buf.retain());
457             default:
458                 throw new Error("unknown wrap type: " + wrapType);
459         }
460     }
461 
462     protected abstract boolean isConnected(Channel channel);
463 
464     protected abstract Channel setupClientChannel(Bootstrap cb, byte[] bytes, CountDownLatch latch,
465                                                   AtomicReference<Throwable> errorRef) throws Throwable;
466 
467     protected abstract Channel setupServerChannel(Bootstrap sb, byte[] bytes, SocketAddress sender,
468                                                   CountDownLatch latch, AtomicReference<Throwable> errorRef,
469                                                   boolean echo) throws Throwable;
470 
471     protected abstract boolean supportDisconnect();
472 
473     protected boolean disconnectMightFail(DatagramChannel channel) {
474         return false;
475     }
476 
477     protected abstract ChannelFuture write(Channel cc, ByteBuf buf, SocketAddress remote, WrapType wrapType);
478 
479     protected static void closeChannel(Channel channel) throws Exception {
480         if (channel != null) {
481             channel.close().sync();
482         }
483     }
484 
485     protected InetSocketAddress sendToAddress(InetSocketAddress serverAddress) {
486         InetAddress addr = serverAddress.getAddress();
487         if (addr.isAnyLocalAddress()) {
488             if (addr instanceof Inet6Address) {
489                 return new InetSocketAddress(NetUtil.LOCALHOST6, serverAddress.getPort());
490             }
491             return new InetSocketAddress(NetUtil.LOCALHOST4, serverAddress.getPort());
492         }
493         return serverAddress;
494     }
495 }