View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.testsuite.transport.socket;
17  
18  import io.netty5.bootstrap.Bootstrap;
19  import io.netty5.buffer.api.Buffer;
20  import io.netty5.buffer.api.BufferAllocator;
21  import io.netty5.buffer.api.CompositeBuffer;
22  import io.netty5.buffer.api.DefaultBufferAllocators;
23  import io.netty5.channel.Channel;
24  import io.netty5.channel.ChannelHandlerContext;
25  import io.netty5.channel.ChannelOption;
26  import io.netty5.channel.SimpleChannelInboundHandler;
27  import io.netty5.channel.socket.DatagramChannel;
28  import io.netty5.util.NetUtil;
29  import io.netty5.util.concurrent.Future;
30  import org.junit.jupiter.api.Test;
31  import org.junit.jupiter.api.TestInfo;
32  
33  import java.net.Inet6Address;
34  import java.net.InetAddress;
35  import java.net.InetSocketAddress;
36  import java.net.SocketAddress;
37  import java.net.SocketException;
38  import java.nio.channels.NotYetConnectedException;
39  import java.util.ArrayList;
40  import java.util.List;
41  import java.util.concurrent.CompletionException;
42  import java.util.concurrent.CountDownLatch;
43  import java.util.concurrent.TimeUnit;
44  import java.util.concurrent.atomic.AtomicReference;
45  
46  import static java.util.Arrays.asList;
47  import static org.assertj.core.api.Assertions.assertThat;
48  import static org.junit.jupiter.api.Assertions.assertFalse;
49  import static org.junit.jupiter.api.Assertions.assertNotNull;
50  import static org.junit.jupiter.api.Assertions.assertNull;
51  import static org.junit.jupiter.api.Assertions.assertTrue;
52  import static org.junit.jupiter.api.Assertions.fail;
53  
54  public abstract class DatagramUnicastTest extends AbstractDatagramTest {
55  
56      private static final byte[] BYTES = {0, 1, 2, 3};
57  
58      @Test
59      public void testSimpleSendDirectBuffer(TestInfo testInfo) throws Throwable {
60          run(testInfo, this::testSimpleSendDirectBuffer);
61      }
62  
63      public void testSimpleSendDirectBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
64          testSimpleSend(sb, cb, DefaultBufferAllocators.offHeapAllocator().copyOf(BYTES), true, BYTES, 1);
65          testSimpleSend(sb, cb, DefaultBufferAllocators.offHeapAllocator().copyOf(BYTES), true, BYTES, 4);
66      }
67  
68      @Test
69      public void testSimpleSendHeapBuffer(TestInfo testInfo) throws Throwable {
70          run(testInfo, this::testSimpleSendHeapBuffer);
71      }
72  
73      public void testSimpleSendHeapBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
74          testSimpleSend(sb, cb, DefaultBufferAllocators.onHeapAllocator().copyOf(BYTES), true, BYTES, 1);
75          testSimpleSend(sb, cb, DefaultBufferAllocators.onHeapAllocator().copyOf(BYTES), true, BYTES, 4);
76      }
77  
78      @Test
79      public void testSimpleSendCompositeDirectBuffer(TestInfo testInfo) throws Throwable {
80          run(testInfo, this::testSimpleSendCompositeDirectBuffer);
81      }
82  
83      public void testSimpleSendCompositeDirectBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
84          BufferAllocator alloc = DefaultBufferAllocators.offHeapAllocator();
85          try (Buffer data = alloc.copyOf(BYTES)) {
86              CompositeBuffer buf = alloc.compose(asList(data.readSplit(2).send(), data.readSplit(2).send()));
87              testSimpleSend(sb, cb, buf, true, BYTES, 1);
88          }
89          try (Buffer data = alloc.copyOf(BYTES)) {
90              CompositeBuffer buf = alloc.compose(asList(data.readSplit(2).send(), data.readSplit(2).send()));
91              testSimpleSend(sb, cb, buf, true, BYTES, 4);
92          }
93      }
94  
95      @Test
96      public void testSimpleSendCompositeHeapBuffer(TestInfo testInfo) throws Throwable {
97          run(testInfo, this::testSimpleSendCompositeHeapBuffer);
98      }
99  
100     public void testSimpleSendCompositeHeapBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
101         BufferAllocator alloc = DefaultBufferAllocators.onHeapAllocator();
102         try (Buffer data = alloc.copyOf(BYTES)) {
103             CompositeBuffer buf = alloc.compose(asList(data.readSplit(2).send(), data.readSplit(2).send()));
104             testSimpleSend(sb, cb, buf, true, BYTES, 1);
105         }
106         try (Buffer data = alloc.copyOf(BYTES)) {
107             CompositeBuffer buf = alloc.compose(asList(data.readSplit(2).send(), data.readSplit(2).send()));
108             testSimpleSend(sb, cb, buf, true, BYTES, 4);
109         }
110     }
111 
112     @Test
113     public void testSimpleSendCompositeMixedBuffer(TestInfo testInfo) throws Throwable {
114         run(testInfo, this::testSimpleSendCompositeMixedBuffer);
115     }
116 
117     public void testSimpleSendCompositeMixedBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
118         BufferAllocator offHeap = DefaultBufferAllocators.offHeapAllocator();
119         BufferAllocator onHeap = DefaultBufferAllocators.onHeapAllocator();
120         CompositeBuffer buf = offHeap.compose(asList(
121                 offHeap.allocate(2).writeBytes(BYTES, 0, 2).send(),
122                 onHeap.allocate(2).writeBytes(BYTES, 0, 2).send()));
123         testSimpleSend(sb, cb, buf, true, BYTES, 1);
124 
125         CompositeBuffer buf2 = offHeap.compose(asList(
126                 offHeap.allocate(2).writeBytes(BYTES, 0, 2).send(),
127                 onHeap.allocate(2).writeBytes(BYTES, 0, 2).send()));
128         testSimpleSend(sb, cb, buf2, true, BYTES, 4);
129     }
130 
131     @Test
132     public void testSimpleSendWithoutBind(TestInfo testInfo) throws Throwable {
133         run(testInfo, this::testSimpleSendWithoutBind);
134     }
135 
136     public void testSimpleSendWithoutBind(Bootstrap sb, Bootstrap cb) throws Throwable {
137         testSimpleSend(sb, cb, DefaultBufferAllocators.offHeapAllocator().copyOf(BYTES), false, BYTES, 1);
138         testSimpleSend(sb, cb, DefaultBufferAllocators.offHeapAllocator().copyOf(BYTES), false, BYTES, 4);
139     }
140 
141     private void testSimpleSend(Bootstrap sb, Bootstrap cb, Buffer buf, boolean bindClient,
142                                 final byte[] bytes, int count) throws Throwable {
143         testSimpleSend0(sb, cb, buf, bindClient, bytes, count);
144         assertFalse(buf.isAccessible());
145     }
146 
147     @Test
148     public void testSimpleSendWithConnect(TestInfo testInfo) throws Throwable {
149         run(testInfo, this::testSimpleSendWithConnect);
150     }
151 
152     public void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb) throws Throwable {
153         testSimpleSendWithConnect(sb, cb, DefaultBufferAllocators.offHeapAllocator().copyOf(BYTES), BYTES, 1);
154         testSimpleSendWithConnect(sb, cb, DefaultBufferAllocators.offHeapAllocator().copyOf(BYTES), BYTES, 4);
155     }
156 
157     @SuppressWarnings("deprecation")
158     private void testSimpleSend0(Bootstrap sb, Bootstrap cb, Buffer buf, boolean bindClient,
159                                 final byte[] bytes, int count) throws Throwable {
160         Channel sc = null;
161         Channel cc = null;
162 
163         try (buf) {
164             cb.handler(new SimpleChannelInboundHandler<>() {
165                 @Override
166                 public void messageReceived(ChannelHandlerContext ctx, Object msgs) {
167                     // Nothing will be sent.
168                 }
169             });
170 
171             final SocketAddress sender;
172             if (bindClient) {
173                 cc = cb.bind(newSocketAddress()).asStage().get();
174                 sender = cc.localAddress();
175             } else {
176                 cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
177                 cc = cb.register().asStage().get();
178                 sender = null;
179             }
180 
181             final CountDownLatch latch = new CountDownLatch(count);
182             AtomicReference<Throwable> errorRef = new AtomicReference<>();
183             sc = setupServerChannel(sb, bytes, sender, latch, errorRef, false);
184 
185             SocketAddress localAddr = sc.localAddress();
186             SocketAddress addr = localAddr instanceof InetSocketAddress ?
187                     sendToAddress((InetSocketAddress) localAddr) : localAddr;
188             List<Future<Void>> futures = new ArrayList<>(count);
189             for (int i = 0; i < count; i++) {
190                 futures.add(write(cc, buf.copy(), addr));
191             }
192             // release as we used buf.retain() before
193             cc.flush();
194 
195             for (Future<Void> future: futures) {
196                 future.asStage().sync();
197             }
198             if (!latch.await(10, TimeUnit.SECONDS)) {
199                 Throwable error = errorRef.get();
200                 if (error != null) {
201                     throw error;
202                 }
203                 fail();
204             }
205         } finally {
206             closeChannel(cc);
207             closeChannel(sc);
208         }
209     }
210 
211     private void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb, Buffer buf, final byte[] bytes, int count)
212             throws Throwable {
213         testSimpleSendWithConnect0(sb, cb, buf, bytes, count);
214         assertFalse(buf.isAccessible());
215     }
216 
217     private void testSimpleSendWithConnect0(Bootstrap sb, Bootstrap cb, Buffer buf, final byte[] bytes, int count)
218             throws Throwable {
219         Channel sc = null;
220         Channel cc = null;
221         try (buf) {
222             final CountDownLatch latch = new CountDownLatch(count);
223             final AtomicReference<Throwable> errorRef = new AtomicReference<>();
224             final CountDownLatch clientLatch = new CountDownLatch(count);
225             final AtomicReference<Throwable> clientErrorRef = new AtomicReference<>();
226             cc = setupClientChannel(cb, bytes, clientLatch, clientErrorRef);
227             sc = setupServerChannel(sb, bytes, cc.localAddress(), latch, errorRef, true);
228 
229             SocketAddress localAddr = sc.localAddress();
230             SocketAddress addr = localAddr instanceof InetSocketAddress ?
231                     sendToAddress((InetSocketAddress) localAddr) : localAddr;
232             cc.connect(addr).asStage().sync();
233 
234             List<Future<Void>> futures = new ArrayList<>();
235             for (int i = 0; i < count; i++) {
236                 futures.add(cc.write(buf.copy()));
237             }
238             cc.flush();
239 
240             for (Future<Void> future: futures) {
241                 future.asStage().sync();
242             }
243 
244             if (!latch.await(10, TimeUnit.SECONDS)) {
245                 Throwable cause = errorRef.get();
246                 if (cause != null) {
247                     throw cause;
248                 }
249                 fail("timed out waiting for latch(" +
250                      "initial count: " + count + ", current count: " + latch.getCount() + ')');
251             }
252             if (!clientLatch.await(10, TimeUnit.SECONDS)) {
253                 Throwable cause = clientErrorRef.get();
254                 if (cause != null) {
255                     throw cause;
256                 }
257                 fail("timed out waiting for clientLatch(" +
258                      "initial count: " + count + ", current count: " + clientLatch.getCount() +
259                      ')');
260             }
261             assertTrue(isConnected(cc));
262 
263             assertNotNull(cc.localAddress());
264             assertNotNull(cc.remoteAddress());
265 
266             if (supportDisconnect()) {
267                 try {
268                     // Test what happens when we call disconnect()
269                     cc.disconnect().asStage().sync();
270                 } catch (CompletionException e) {
271                     if (e.getCause() instanceof SocketException) {
272                         if (disconnectMightFail((DatagramChannel) cc)) {
273                             return;
274                         }
275                     }
276                     throw e;
277                 }
278 
279                 // Test what happens when we call disconnect()
280                 assertFalse(isConnected(cc));
281                 assertNotNull(cc.localAddress());
282                 assertNull(cc.remoteAddress());
283 
284                 Throwable cause = cc.writeAndFlush(buf.copy()).asStage().getCause();
285                 assertThat(cause).isInstanceOf(NotYetConnectedException.class);
286             }
287         } finally {
288             closeChannel(cc);
289             closeChannel(sc);
290         }
291     }
292 
293     protected abstract boolean isConnected(Channel channel);
294 
295     protected abstract Channel setupClientChannel(Bootstrap cb, byte[] bytes, CountDownLatch latch,
296                                                   AtomicReference<Throwable> errorRef) throws Throwable;
297 
298     protected abstract Channel setupServerChannel(Bootstrap sb, byte[] bytes, SocketAddress sender,
299                                                   CountDownLatch latch, AtomicReference<Throwable> errorRef,
300                                                   boolean echo) throws Throwable;
301 
302     protected abstract boolean supportDisconnect();
303 
304     protected boolean disconnectMightFail(DatagramChannel channel) {
305         return false;
306     }
307 
308     protected abstract Future<Void> write(Channel cc, Buffer buf, SocketAddress remote);
309 
310     protected static void closeChannel(Channel channel) throws Exception {
311         if (channel != null) {
312             channel.close().asStage().sync();
313         }
314     }
315 
316     protected InetSocketAddress sendToAddress(InetSocketAddress serverAddress) {
317         InetAddress addr = serverAddress.getAddress();
318         if (addr.isAnyLocalAddress()) {
319             if (addr instanceof Inet6Address) {
320                 return new InetSocketAddress(NetUtil.LOCALHOST6, serverAddress.getPort());
321             }
322             return new InetSocketAddress(NetUtil.LOCALHOST4, serverAddress.getPort());
323         }
324         return serverAddress;
325     }
326 }