1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.testsuite.transport.socket;
17
18 import io.netty5.bootstrap.Bootstrap;
19 import io.netty5.buffer.api.Buffer;
20 import io.netty5.channel.Channel;
21 import io.netty5.channel.ChannelHandlerAdapter;
22 import io.netty5.channel.ChannelHandlerContext;
23 import io.netty5.channel.ChannelInitializer;
24 import io.netty5.channel.SimpleChannelInboundHandler;
25 import io.netty5.channel.socket.DatagramChannel;
26 import io.netty5.channel.socket.DatagramPacket;
27 import io.netty5.util.concurrent.Future;
28 import org.junit.jupiter.api.Test;
29 import org.junit.jupiter.api.TestInfo;
30
31 import java.net.InetSocketAddress;
32 import java.net.SocketAddress;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import static org.junit.jupiter.api.Assertions.assertEquals;
37 import static org.junit.jupiter.api.Assertions.assertNotNull;
38
39 public class DatagramUnicastInetTest extends DatagramUnicastTest {
40
41 @Test
42 public void testBindWithPortOnly(TestInfo testInfo) throws Throwable {
43 run(testInfo, DatagramUnicastInetTest::testBindWithPortOnly);
44 }
45
46 private static void testBindWithPortOnly(Bootstrap sb, Bootstrap cb) throws Throwable {
47 Channel channel = null;
48 try {
49 cb.handler(new ChannelHandlerAdapter() { });
50 channel = cb.bind(0).asStage().get();
51 } finally {
52 closeChannel(channel);
53 }
54 }
55
56 @Override
57 protected boolean isConnected(Channel channel) {
58 return ((DatagramChannel) channel).isConnected();
59 }
60
61 @Override
62 protected Channel setupClientChannel(Bootstrap cb, final byte[] bytes, final CountDownLatch latch,
63 final AtomicReference<Throwable> errorRef) throws Throwable {
64 cb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
65
66 @Override
67 public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) {
68 try {
69 Buffer buf = msg.content();
70 assertEquals(bytes.length, buf.readableBytes());
71 for (int i = 0; i < bytes.length; i++) {
72 assertEquals(bytes[i], buf.getByte(buf.readerOffset() + i));
73 }
74
75 InetSocketAddress localAddress = (InetSocketAddress) ctx.channel().localAddress();
76 if (localAddress.getAddress().isAnyLocalAddress()) {
77 assertEquals(localAddress.getPort(), ((InetSocketAddress) msg.recipient()).getPort());
78 } else {
79
80 assertEquals(localAddress, msg.recipient());
81 }
82 } finally {
83 latch.countDown();
84 }
85 }
86
87 @Override
88 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
89 errorRef.compareAndSet(null, cause);
90 }
91 });
92 return cb.bind(newSocketAddress()).asStage().get();
93 }
94
95 @Override
96 protected Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final SocketAddress sender,
97 final CountDownLatch latch, final AtomicReference<Throwable> errorRef,
98 final boolean echo) throws Throwable {
99 sb.handler(new ChannelInitializer<>() {
100
101 @Override
102 protected void initChannel(Channel ch) {
103 ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
104
105 @Override
106 public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) {
107 try {
108 if (sender == null) {
109 assertNotNull(msg.sender());
110 } else {
111 InetSocketAddress senderAddress = (InetSocketAddress) sender;
112 if (senderAddress.getAddress().isAnyLocalAddress()) {
113 assertEquals(senderAddress.getPort(), ((InetSocketAddress) msg.sender()).getPort());
114 } else {
115 assertEquals(sender, msg.sender());
116 }
117 }
118
119 Buffer buf = msg.content();
120 assertEquals(bytes.length, buf.readableBytes());
121 for (int i = 0; i < bytes.length; i++) {
122 assertEquals(bytes[i], buf.getByte(buf.readerOffset() + i));
123 }
124
125
126 assertEquals(ctx.channel().localAddress(), msg.recipient());
127
128 if (echo) {
129 ctx.writeAndFlush(new DatagramPacket(buf.split(), msg.sender()));
130 }
131 } finally {
132 latch.countDown();
133 }
134 }
135
136 @Override
137 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
138 errorRef.compareAndSet(null, cause);
139 }
140 });
141 }
142 });
143 return sb.bind(newSocketAddress()).asStage().get();
144 }
145
146 @Override
147 protected boolean supportDisconnect() {
148 return true;
149 }
150
151 @Override
152 protected Future<Void> write(Channel cc, Buffer buf, SocketAddress remote) {
153 return cc.write(new DatagramPacket(buf, remote));
154 }
155 }