View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.CompositeByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelFuture;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelOption;
26  import io.netty.channel.SimpleChannelInboundHandler;
27  import io.netty.util.NetUtil;
28  import org.junit.jupiter.api.Test;
29  import org.junit.jupiter.api.TestInfo;
30  
31  import java.net.Inet6Address;
32  import java.net.InetAddress;
33  import java.net.InetSocketAddress;
34  import java.net.SocketAddress;
35  import java.nio.channels.NotYetConnectedException;
36  import java.util.ArrayList;
37  import java.util.List;
38  import java.util.concurrent.CountDownLatch;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.atomic.AtomicReference;
41  
42  import static org.junit.jupiter.api.Assertions.assertFalse;
43  import static org.junit.jupiter.api.Assertions.assertNotNull;
44  import static org.junit.jupiter.api.Assertions.assertNull;
45  import static org.junit.jupiter.api.Assertions.assertTrue;
46  import static org.junit.jupiter.api.Assertions.fail;
47  
48  public abstract class DatagramUnicastTest extends AbstractDatagramTest {
49  
50      private static final byte[] BYTES = {0, 1, 2, 3};
51      protected enum WrapType {
52          NONE, DUP, SLICE, READ_ONLY
53      }
54  
55      @Test
56      public void testSimpleSendDirectByteBuf(TestInfo testInfo) throws Throwable {
57          run(testInfo, new Runner<Bootstrap, Bootstrap>() {
58              @Override
59              public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
60                  testSimpleSendDirectByteBuf(bootstrap, bootstrap2);
61              }
62          });
63      }
64  
65      public void testSimpleSendDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
66          testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 1);
67          testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 4);
68      }
69  
70      @Test
71      public void testSimpleSendHeapByteBuf(TestInfo testInfo) throws Throwable {
72          run(testInfo, new Runner<Bootstrap, Bootstrap>() {
73              @Override
74              public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
75                  testSimpleSendHeapByteBuf(bootstrap, bootstrap2);
76              }
77          });
78      }
79  
80      public void testSimpleSendHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
81          testSimpleSend(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 1);
82          testSimpleSend(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 4);
83      }
84  
85      @Test
86      public void testSimpleSendCompositeDirectByteBuf(TestInfo testInfo) throws Throwable {
87          run(testInfo, new Runner<Bootstrap, Bootstrap>() {
88              @Override
89              public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
90                  testSimpleSendCompositeDirectByteBuf(bootstrap, bootstrap2);
91              }
92          });
93      }
94  
95      public void testSimpleSendCompositeDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
96          CompositeByteBuf buf = Unpooled.compositeBuffer();
97          buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
98          buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
99          testSimpleSend(sb, cb, buf, true, BYTES, 1);
100 
101         CompositeByteBuf buf2 = Unpooled.compositeBuffer();
102         buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
103         buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
104         testSimpleSend(sb, cb, buf2, true, BYTES, 4);
105     }
106 
107     @Test
108     public void testSimpleSendCompositeHeapByteBuf(TestInfo testInfo) throws Throwable {
109         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
110             @Override
111             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
112                 testSimpleSendCompositeHeapByteBuf(bootstrap, bootstrap2);
113             }
114         });
115     }
116 
117     public void testSimpleSendCompositeHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
118         CompositeByteBuf buf = Unpooled.compositeBuffer();
119         buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
120         buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
121         testSimpleSend(sb, cb, buf, true, BYTES, 1);
122 
123         CompositeByteBuf buf2 = Unpooled.compositeBuffer();
124         buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
125         buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
126         testSimpleSend(sb, cb, buf2, true, BYTES, 4);
127     }
128 
129     @Test
130     public void testSimpleSendCompositeMixedByteBuf(TestInfo testInfo) throws Throwable {
131         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
132             @Override
133             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
134                 testSimpleSendCompositeMixedByteBuf(bootstrap, bootstrap2);
135             }
136         });
137     }
138 
139     public void testSimpleSendCompositeMixedByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
140         CompositeByteBuf buf = Unpooled.compositeBuffer();
141         buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
142         buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
143         testSimpleSend(sb, cb, buf, true, BYTES, 1);
144 
145         CompositeByteBuf buf2 = Unpooled.compositeBuffer();
146         buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
147         buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
148         testSimpleSend(sb, cb, buf2, true, BYTES, 4);
149     }
150 
151     @Test
152     public void testSimpleSendWithoutBind(TestInfo testInfo) throws Throwable {
153         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
154             @Override
155             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
156                 testSimpleSendWithoutBind(bootstrap, bootstrap2);
157             }
158         });
159     }
160 
161     public void testSimpleSendWithoutBind(Bootstrap sb, Bootstrap cb) throws Throwable {
162         testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 1);
163         testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 4);
164     }
165 
166     private void testSimpleSend(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
167                                 final byte[] bytes, int count) throws Throwable {
168         for (WrapType type: WrapType.values()) {
169             testSimpleSend0(sb, cb, buf.retain(), bindClient, bytes, count, type);
170         }
171         assertTrue(buf.release());
172     }
173 
174     @Test
175     public void testSimpleSendWithConnect(TestInfo testInfo) throws Throwable {
176         run(testInfo, new Runner<Bootstrap, Bootstrap>() {
177             @Override
178             public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
179                 testSimpleSendWithConnect(bootstrap, bootstrap2);
180             }
181         });
182     }
183 
184     public void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb) throws Throwable {
185         testSimpleSendWithConnect(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), BYTES, 1);
186         testSimpleSendWithConnect(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), BYTES, 4);
187     }
188 
189     @SuppressWarnings("deprecation")
190     private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
191                                 final byte[] bytes, int count, WrapType wrapType)
192             throws Throwable {
193         Channel sc = null;
194         Channel cc = null;
195 
196         try {
197             cb.handler(new SimpleChannelInboundHandler<Object>() {
198                 @Override
199                 public void channelRead0(ChannelHandlerContext ctx, Object msgs) {
200                     // Nothing will be sent.
201                 }
202             });
203 
204             final SocketAddress sender;
205             if (bindClient) {
206                 cc = cb.bind(newSocketAddress()).sync().channel();
207                 sender = cc.localAddress();
208             } else {
209                 cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
210                 cc = cb.register().sync().channel();
211                 sender = null;
212             }
213 
214             final CountDownLatch latch = new CountDownLatch(count);
215             AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
216             sc = setupServerChannel(sb, bytes, sender, latch, errorRef, false);
217 
218             SocketAddress localAddr = sc.localAddress();
219             SocketAddress addr = localAddr instanceof InetSocketAddress ?
220                     sendToAddress((InetSocketAddress) sc.localAddress()) : localAddr;
221             List<ChannelFuture> futures = new ArrayList<ChannelFuture>(count);
222             for (int i = 0; i < count; i++) {
223                 futures.add(write(cc, buf, addr, wrapType));
224             }
225             // release as we used buf.retain() before
226             cc.flush();
227 
228             for (ChannelFuture future: futures) {
229                 future.sync();
230             }
231             if (!latch.await(10, TimeUnit.SECONDS)) {
232                 Throwable error = errorRef.get();
233                 if (error != null) {
234                     throw error;
235                 }
236                 fail();
237             }
238         } finally {
239             // release as we used buf.retain() before
240             buf.release();
241 
242             closeChannel(cc);
243             closeChannel(sc);
244         }
245     }
246 
247     private void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count)
248             throws Throwable {
249         try {
250             for (WrapType type : WrapType.values()) {
251                 testSimpleSendWithConnect0(sb, cb, buf.retain(), bytes, count, type);
252             }
253         } finally {
254             assertTrue(buf.release());
255         }
256     }
257 
258     private void testSimpleSendWithConnect0(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count,
259                                             WrapType wrapType) throws Throwable {
260         Channel sc = null;
261         Channel cc = null;
262         try {
263             final CountDownLatch latch = new CountDownLatch(count);
264             final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
265             final CountDownLatch clientLatch = new CountDownLatch(count);
266             final AtomicReference<Throwable> clientErrorRef = new AtomicReference<Throwable>();
267             cc = setupClientChannel(cb, bytes, clientLatch, clientErrorRef);
268             sc = setupServerChannel(sb, bytes, cc.localAddress(), latch, errorRef, true);
269 
270             SocketAddress localAddr = sc.localAddress();
271             SocketAddress addr = localAddr instanceof InetSocketAddress ?
272                     sendToAddress((InetSocketAddress) sc.localAddress()) : localAddr;
273             cc.connect(addr).syncUninterruptibly();
274 
275             List<ChannelFuture> futures = new ArrayList<ChannelFuture>();
276             for (int i = 0; i < count; i++) {
277                 futures.add(write(cc, buf, wrapType));
278             }
279             cc.flush();
280 
281             for (ChannelFuture future: futures) {
282                 future.sync();
283             }
284 
285             if (!latch.await(10, TimeUnit.SECONDS)) {
286                 Throwable cause = errorRef.get();
287                 if (cause != null) {
288                     throw cause;
289                 }
290                 fail();
291             }
292             if (!clientLatch.await(10, TimeUnit.SECONDS)) {
293                 Throwable cause = clientErrorRef.get();
294                 if (cause != null) {
295                     throw cause;
296                 }
297                 fail();
298             }
299             assertTrue(isConnected(cc));
300 
301             assertNotNull(cc.localAddress());
302             assertNotNull(cc.remoteAddress());
303 
304             if (supportDisconnect()) {
305                 // Test what happens when we call disconnect()
306                 cc.disconnect().syncUninterruptibly();
307                 assertFalse(isConnected(cc));
308                 assertNotNull(cc.localAddress());
309                 assertNull(cc.remoteAddress());
310 
311                 ChannelFuture future = cc.writeAndFlush(
312                         buf.retain().duplicate()).awaitUninterruptibly();
313                 assertTrue(future.cause() instanceof NotYetConnectedException,
314                         "NotYetConnectedException expected, got: " + future.cause());
315             }
316         } finally {
317             // release as we used buf.retain() before
318             buf.release();
319 
320             closeChannel(cc);
321             closeChannel(sc);
322         }
323     }
324 
325     private static ChannelFuture write(Channel cc, ByteBuf buf, WrapType wrapType) {
326         switch (wrapType) {
327             case DUP:
328                 return cc.write(buf.retainedDuplicate());
329             case SLICE:
330                 return cc.write(buf.retainedSlice());
331             case READ_ONLY:
332                 return cc.write(buf.retain().asReadOnly());
333             case NONE:
334                 return cc.write(buf.retain());
335             default:
336                 throw new Error("unknown wrap type: " + wrapType);
337         }
338     }
339 
340     protected abstract boolean isConnected(Channel channel);
341 
342     protected abstract Channel setupClientChannel(Bootstrap cb, byte[] bytes, CountDownLatch latch,
343                                                   AtomicReference<Throwable> errorRef) throws Throwable;
344 
345     protected abstract Channel setupServerChannel(Bootstrap sb, byte[] bytes, SocketAddress sender,
346                                                   CountDownLatch latch, AtomicReference<Throwable> errorRef,
347                                                   boolean echo) throws Throwable;
348 
349     protected abstract boolean supportDisconnect();
350 
351     protected abstract ChannelFuture write(Channel cc, ByteBuf buf, SocketAddress remote, WrapType wrapType);
352 
353     protected static void closeChannel(Channel channel) throws Exception {
354         if (channel != null) {
355             channel.close().sync();
356         }
357     }
358 
359     protected InetSocketAddress sendToAddress(InetSocketAddress serverAddress) {
360         InetAddress addr = serverAddress.getAddress();
361         if (addr.isAnyLocalAddress()) {
362             if (addr instanceof Inet6Address) {
363                 return new InetSocketAddress(NetUtil.LOCALHOST6, serverAddress.getPort());
364             }
365             return new InetSocketAddress(NetUtil.LOCALHOST4, serverAddress.getPort());
366         }
367         return serverAddress;
368     }
369 }