1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.testsuite.transport.socket;
17
18 import io.netty5.bootstrap.Bootstrap;
19 import io.netty5.buffer.api.Buffer;
20 import io.netty5.channel.Channel;
21 import io.netty5.channel.ChannelHandlerAdapter;
22 import io.netty5.channel.ChannelHandlerContext;
23 import io.netty5.channel.ChannelInitializer;
24 import io.netty5.channel.SimpleChannelInboundHandler;
25 import io.netty5.channel.socket.DatagramChannel;
26 import io.netty5.channel.socket.DatagramPacket;
27 import io.netty5.channel.socket.DomainSocketAddress;
28 import io.netty5.testsuite.transport.TestsuitePermutation;
29 import io.netty5.util.concurrent.Future;
30 import org.junit.jupiter.api.Test;
31 import org.junit.jupiter.api.TestInfo;
32 import org.junit.jupiter.api.condition.EnabledIf;
33
34 import java.net.SocketAddress;
35 import java.util.List;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import static org.assertj.core.api.Assertions.assertThat;
40 import static org.junit.jupiter.api.Assertions.assertEquals;
41 import static org.junit.jupiter.api.Assertions.assertNotNull;
42
43 @EnabledIf("isSupported")
44 public class DomainDatagramUnicastTest extends DatagramUnicastTest {
45
46 static boolean isSupported() {
47 return NioDomainSocketTestUtil.isDatagramSupported();
48 }
49
50 @Test
51 void testBind(TestInfo testInfo) throws Throwable {
52 run(testInfo, (bootstrap, bootstrap2) -> testBind(bootstrap2));
53 }
54
55 private void testBind(Bootstrap cb) throws Throwable {
56 Channel channel = null;
57 try {
58 channel = cb.handler(new ChannelHandlerAdapter() { })
59 .bind(newSocketAddress()).asStage().get();
60 assertThat(channel.localAddress()).isNotNull()
61 .isInstanceOf(DomainSocketAddress.class);
62 } finally {
63 closeChannel(channel);
64 }
65 }
66
67 @Override
68 protected boolean supportDisconnect() {
69 return false;
70 }
71
72 @Override
73 protected boolean isConnected(Channel channel) {
74 return ((DatagramChannel) channel).isConnected();
75 }
76
77 @Override
78 protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
79 return SocketTestPermutation.INSTANCE.datagram(NioDomainSocketTestUtil.domainSocketFamily());
80 }
81
82 @Override
83 protected SocketAddress newSocketAddress() {
84 return SocketTestPermutation.newDomainSocketAddress();
85 }
86
87 @Override
88 protected Channel setupClientChannel(Bootstrap cb, final byte[] bytes, final CountDownLatch latch,
89 final AtomicReference<Throwable> errorRef) throws Throwable {
90 cb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
91
92 @Override
93 public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) {
94 try {
95 Buffer buf = msg.content();
96 assertEquals(bytes.length, buf.readableBytes());
97 for (int i = 0; i < bytes.length; i++) {
98 assertEquals(bytes[i], buf.getByte(buf.readerOffset() + i));
99 }
100
101 assertEquals(ctx.channel().localAddress(), msg.recipient());
102 } finally {
103 latch.countDown();
104 }
105 }
106
107 @Override
108 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
109 errorRef.compareAndSet(null, cause);
110 }
111 });
112 return cb.bind(newSocketAddress()).asStage().get();
113 }
114
115 @Override
116 protected Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final SocketAddress sender,
117 final CountDownLatch latch, final AtomicReference<Throwable> errorRef,
118 final boolean echo) throws Throwable {
119 sb.handler(new ChannelInitializer<Channel>() {
120
121 @Override
122 protected void initChannel(Channel ch) {
123 ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
124 @Override
125 public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) {
126 try {
127 if (sender == null) {
128 assertNotNull(msg.sender());
129 } else {
130 assertEquals(sender, msg.sender());
131 }
132
133 Buffer buf = msg.content();
134 assertEquals(bytes.length, buf.readableBytes());
135 for (int i = 0; i < bytes.length; i++) {
136 assertEquals(bytes[i], buf.getByte(buf.readerOffset() + i));
137 }
138
139 assertEquals(ctx.channel().localAddress(), msg.recipient());
140
141 if (echo) {
142 ctx.writeAndFlush(new DatagramPacket(buf.split(), msg.sender()));
143 }
144 } finally {
145 latch.countDown();
146 }
147 }
148
149 @Override
150 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
151 errorRef.compareAndSet(null, cause);
152 }
153 });
154 }
155 });
156 return sb.bind(newSocketAddress()).asStage().get();
157 }
158
159 @Override
160 protected Future<Void> write(Channel cc, Buffer buf, SocketAddress remote) {
161 return cc.write(new DatagramPacket(buf, remote));
162 }
163 }