1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.testsuite.transport.socket;
17
18 import io.netty5.bootstrap.Bootstrap;
19 import io.netty5.buffer.api.Buffer;
20 import io.netty5.buffer.api.BufferAllocator;
21 import io.netty5.buffer.api.CompositeBuffer;
22 import io.netty5.buffer.api.DefaultBufferAllocators;
23 import io.netty5.channel.Channel;
24 import io.netty5.channel.ChannelHandlerContext;
25 import io.netty5.channel.ChannelOption;
26 import io.netty5.channel.SimpleChannelInboundHandler;
27 import io.netty5.channel.socket.DatagramChannel;
28 import io.netty5.util.NetUtil;
29 import io.netty5.util.concurrent.Future;
30 import org.junit.jupiter.api.Test;
31 import org.junit.jupiter.api.TestInfo;
32
33 import java.net.Inet6Address;
34 import java.net.InetAddress;
35 import java.net.InetSocketAddress;
36 import java.net.SocketAddress;
37 import java.net.SocketException;
38 import java.nio.channels.NotYetConnectedException;
39 import java.util.ArrayList;
40 import java.util.List;
41 import java.util.concurrent.CompletionException;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicReference;
45
46 import static java.util.Arrays.asList;
47 import static org.assertj.core.api.Assertions.assertThat;
48 import static org.junit.jupiter.api.Assertions.assertFalse;
49 import static org.junit.jupiter.api.Assertions.assertNotNull;
50 import static org.junit.jupiter.api.Assertions.assertNull;
51 import static org.junit.jupiter.api.Assertions.assertTrue;
52 import static org.junit.jupiter.api.Assertions.fail;
53
54 public abstract class DatagramUnicastTest extends AbstractDatagramTest {
55
56 private static final byte[] BYTES = {0, 1, 2, 3};
57
58 @Test
59 public void testSimpleSendDirectBuffer(TestInfo testInfo) throws Throwable {
60 run(testInfo, this::testSimpleSendDirectBuffer);
61 }
62
63 public void testSimpleSendDirectBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
64 testSimpleSend(sb, cb, DefaultBufferAllocators.offHeapAllocator().copyOf(BYTES), true, BYTES, 1);
65 testSimpleSend(sb, cb, DefaultBufferAllocators.offHeapAllocator().copyOf(BYTES), true, BYTES, 4);
66 }
67
68 @Test
69 public void testSimpleSendHeapBuffer(TestInfo testInfo) throws Throwable {
70 run(testInfo, this::testSimpleSendHeapBuffer);
71 }
72
73 public void testSimpleSendHeapBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
74 testSimpleSend(sb, cb, DefaultBufferAllocators.onHeapAllocator().copyOf(BYTES), true, BYTES, 1);
75 testSimpleSend(sb, cb, DefaultBufferAllocators.onHeapAllocator().copyOf(BYTES), true, BYTES, 4);
76 }
77
78 @Test
79 public void testSimpleSendCompositeDirectBuffer(TestInfo testInfo) throws Throwable {
80 run(testInfo, this::testSimpleSendCompositeDirectBuffer);
81 }
82
83 public void testSimpleSendCompositeDirectBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
84 BufferAllocator alloc = DefaultBufferAllocators.offHeapAllocator();
85 try (Buffer data = alloc.copyOf(BYTES)) {
86 CompositeBuffer buf = alloc.compose(asList(data.readSplit(2).send(), data.readSplit(2).send()));
87 testSimpleSend(sb, cb, buf, true, BYTES, 1);
88 }
89 try (Buffer data = alloc.copyOf(BYTES)) {
90 CompositeBuffer buf = alloc.compose(asList(data.readSplit(2).send(), data.readSplit(2).send()));
91 testSimpleSend(sb, cb, buf, true, BYTES, 4);
92 }
93 }
94
95 @Test
96 public void testSimpleSendCompositeHeapBuffer(TestInfo testInfo) throws Throwable {
97 run(testInfo, this::testSimpleSendCompositeHeapBuffer);
98 }
99
100 public void testSimpleSendCompositeHeapBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
101 BufferAllocator alloc = DefaultBufferAllocators.onHeapAllocator();
102 try (Buffer data = alloc.copyOf(BYTES)) {
103 CompositeBuffer buf = alloc.compose(asList(data.readSplit(2).send(), data.readSplit(2).send()));
104 testSimpleSend(sb, cb, buf, true, BYTES, 1);
105 }
106 try (Buffer data = alloc.copyOf(BYTES)) {
107 CompositeBuffer buf = alloc.compose(asList(data.readSplit(2).send(), data.readSplit(2).send()));
108 testSimpleSend(sb, cb, buf, true, BYTES, 4);
109 }
110 }
111
112 @Test
113 public void testSimpleSendCompositeMixedBuffer(TestInfo testInfo) throws Throwable {
114 run(testInfo, this::testSimpleSendCompositeMixedBuffer);
115 }
116
117 public void testSimpleSendCompositeMixedBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
118 BufferAllocator offHeap = DefaultBufferAllocators.offHeapAllocator();
119 BufferAllocator onHeap = DefaultBufferAllocators.onHeapAllocator();
120 CompositeBuffer buf = offHeap.compose(asList(
121 offHeap.allocate(2).writeBytes(BYTES, 0, 2).send(),
122 onHeap.allocate(2).writeBytes(BYTES, 0, 2).send()));
123 testSimpleSend(sb, cb, buf, true, BYTES, 1);
124
125 CompositeBuffer buf2 = offHeap.compose(asList(
126 offHeap.allocate(2).writeBytes(BYTES, 0, 2).send(),
127 onHeap.allocate(2).writeBytes(BYTES, 0, 2).send()));
128 testSimpleSend(sb, cb, buf2, true, BYTES, 4);
129 }
130
131 @Test
132 public void testSimpleSendWithoutBind(TestInfo testInfo) throws Throwable {
133 run(testInfo, this::testSimpleSendWithoutBind);
134 }
135
136 public void testSimpleSendWithoutBind(Bootstrap sb, Bootstrap cb) throws Throwable {
137 testSimpleSend(sb, cb, DefaultBufferAllocators.offHeapAllocator().copyOf(BYTES), false, BYTES, 1);
138 testSimpleSend(sb, cb, DefaultBufferAllocators.offHeapAllocator().copyOf(BYTES), false, BYTES, 4);
139 }
140
141 private void testSimpleSend(Bootstrap sb, Bootstrap cb, Buffer buf, boolean bindClient,
142 final byte[] bytes, int count) throws Throwable {
143 testSimpleSend0(sb, cb, buf, bindClient, bytes, count);
144 assertFalse(buf.isAccessible());
145 }
146
147 @Test
148 public void testSimpleSendWithConnect(TestInfo testInfo) throws Throwable {
149 run(testInfo, this::testSimpleSendWithConnect);
150 }
151
152 public void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb) throws Throwable {
153 testSimpleSendWithConnect(sb, cb, DefaultBufferAllocators.offHeapAllocator().copyOf(BYTES), BYTES, 1);
154 testSimpleSendWithConnect(sb, cb, DefaultBufferAllocators.offHeapAllocator().copyOf(BYTES), BYTES, 4);
155 }
156
157 @SuppressWarnings("deprecation")
158 private void testSimpleSend0(Bootstrap sb, Bootstrap cb, Buffer buf, boolean bindClient,
159 final byte[] bytes, int count) throws Throwable {
160 Channel sc = null;
161 Channel cc = null;
162
163 try (buf) {
164 cb.handler(new SimpleChannelInboundHandler<>() {
165 @Override
166 public void messageReceived(ChannelHandlerContext ctx, Object msgs) {
167
168 }
169 });
170
171 final SocketAddress sender;
172 if (bindClient) {
173 cc = cb.bind(newSocketAddress()).asStage().get();
174 sender = cc.localAddress();
175 } else {
176 cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
177 cc = cb.register().asStage().get();
178 sender = null;
179 }
180
181 final CountDownLatch latch = new CountDownLatch(count);
182 AtomicReference<Throwable> errorRef = new AtomicReference<>();
183 sc = setupServerChannel(sb, bytes, sender, latch, errorRef, false);
184
185 SocketAddress localAddr = sc.localAddress();
186 SocketAddress addr = localAddr instanceof InetSocketAddress ?
187 sendToAddress((InetSocketAddress) localAddr) : localAddr;
188 List<Future<Void>> futures = new ArrayList<>(count);
189 for (int i = 0; i < count; i++) {
190 futures.add(write(cc, buf.copy(), addr));
191 }
192
193 cc.flush();
194
195 for (Future<Void> future: futures) {
196 future.asStage().sync();
197 }
198 if (!latch.await(10, TimeUnit.SECONDS)) {
199 Throwable error = errorRef.get();
200 if (error != null) {
201 throw error;
202 }
203 fail();
204 }
205 } finally {
206 closeChannel(cc);
207 closeChannel(sc);
208 }
209 }
210
211 private void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb, Buffer buf, final byte[] bytes, int count)
212 throws Throwable {
213 testSimpleSendWithConnect0(sb, cb, buf, bytes, count);
214 assertFalse(buf.isAccessible());
215 }
216
217 private void testSimpleSendWithConnect0(Bootstrap sb, Bootstrap cb, Buffer buf, final byte[] bytes, int count)
218 throws Throwable {
219 Channel sc = null;
220 Channel cc = null;
221 try (buf) {
222 final CountDownLatch latch = new CountDownLatch(count);
223 final AtomicReference<Throwable> errorRef = new AtomicReference<>();
224 final CountDownLatch clientLatch = new CountDownLatch(count);
225 final AtomicReference<Throwable> clientErrorRef = new AtomicReference<>();
226 cc = setupClientChannel(cb, bytes, clientLatch, clientErrorRef);
227 sc = setupServerChannel(sb, bytes, cc.localAddress(), latch, errorRef, true);
228
229 SocketAddress localAddr = sc.localAddress();
230 SocketAddress addr = localAddr instanceof InetSocketAddress ?
231 sendToAddress((InetSocketAddress) localAddr) : localAddr;
232 cc.connect(addr).asStage().sync();
233
234 List<Future<Void>> futures = new ArrayList<>();
235 for (int i = 0; i < count; i++) {
236 futures.add(cc.write(buf.copy()));
237 }
238 cc.flush();
239
240 for (Future<Void> future: futures) {
241 future.asStage().sync();
242 }
243
244 if (!latch.await(10, TimeUnit.SECONDS)) {
245 Throwable cause = errorRef.get();
246 if (cause != null) {
247 throw cause;
248 }
249 fail("timed out waiting for latch(" +
250 "initial count: " + count + ", current count: " + latch.getCount() + ')');
251 }
252 if (!clientLatch.await(10, TimeUnit.SECONDS)) {
253 Throwable cause = clientErrorRef.get();
254 if (cause != null) {
255 throw cause;
256 }
257 fail("timed out waiting for clientLatch(" +
258 "initial count: " + count + ", current count: " + clientLatch.getCount() +
259 ')');
260 }
261 assertTrue(isConnected(cc));
262
263 assertNotNull(cc.localAddress());
264 assertNotNull(cc.remoteAddress());
265
266 if (supportDisconnect()) {
267 try {
268
269 cc.disconnect().asStage().sync();
270 } catch (CompletionException e) {
271 if (e.getCause() instanceof SocketException) {
272 if (disconnectMightFail((DatagramChannel) cc)) {
273 return;
274 }
275 }
276 throw e;
277 }
278
279
280 assertFalse(isConnected(cc));
281 assertNotNull(cc.localAddress());
282 assertNull(cc.remoteAddress());
283
284 Throwable cause = cc.writeAndFlush(buf.copy()).asStage().getCause();
285 assertThat(cause).isInstanceOf(NotYetConnectedException.class);
286 }
287 } finally {
288 closeChannel(cc);
289 closeChannel(sc);
290 }
291 }
292
293 protected abstract boolean isConnected(Channel channel);
294
295 protected abstract Channel setupClientChannel(Bootstrap cb, byte[] bytes, CountDownLatch latch,
296 AtomicReference<Throwable> errorRef) throws Throwable;
297
298 protected abstract Channel setupServerChannel(Bootstrap sb, byte[] bytes, SocketAddress sender,
299 CountDownLatch latch, AtomicReference<Throwable> errorRef,
300 boolean echo) throws Throwable;
301
302 protected abstract boolean supportDisconnect();
303
304 protected boolean disconnectMightFail(DatagramChannel channel) {
305 return false;
306 }
307
308 protected abstract Future<Void> write(Channel cc, Buffer buf, SocketAddress remote);
309
310 protected static void closeChannel(Channel channel) throws Exception {
311 if (channel != null) {
312 channel.close().asStage().sync();
313 }
314 }
315
316 protected InetSocketAddress sendToAddress(InetSocketAddress serverAddress) {
317 InetAddress addr = serverAddress.getAddress();
318 if (addr.isAnyLocalAddress()) {
319 if (addr instanceof Inet6Address) {
320 return new InetSocketAddress(NetUtil.LOCALHOST6, serverAddress.getPort());
321 }
322 return new InetSocketAddress(NetUtil.LOCALHOST4, serverAddress.getPort());
323 }
324 return serverAddress;
325 }
326 }