1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.testsuite.transport.socket;
17
18 import io.netty5.bootstrap.Bootstrap;
19 import io.netty5.buffer.api.BufferAllocator;
20 import io.netty5.channel.ChannelHandlerContext;
21 import io.netty5.channel.ChannelOption;
22 import io.netty5.channel.SimpleChannelInboundHandler;
23 import io.netty5.channel.socket.DatagramPacket;
24 import io.netty5.channel.socket.DatagramChannel;
25 import io.netty5.channel.socket.SocketProtocolFamily;
26 import io.netty5.testsuite.transport.TestsuitePermutation;
27 import io.netty5.util.internal.SocketUtils;
28 import org.junit.jupiter.api.Test;
29 import org.junit.jupiter.api.TestInfo;
30
31 import java.io.IOException;
32 import java.net.InetAddress;
33 import java.net.InetSocketAddress;
34 import java.net.MulticastSocket;
35 import java.net.NetworkInterface;
36 import java.net.UnknownHostException;
37 import java.util.Enumeration;
38 import java.util.List;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.TimeUnit;
41
42 import static io.netty5.util.NetUtil.isFamilySupported;
43
44 import static org.junit.jupiter.api.Assertions.assertEquals;
45 import static org.junit.jupiter.api.Assertions.assertFalse;
46 import static org.junit.jupiter.api.Assertions.assertTrue;
47 import static org.junit.jupiter.api.Assertions.fail;
48 import static org.junit.jupiter.api.Assumptions.assumeTrue;
49
50 public class DatagramMulticastTest extends AbstractDatagramTest {
51 @Test
52 public void testMulticast(TestInfo testInfo) throws Throwable {
53 run(testInfo, this::testMulticast);
54 }
55
56 public void testMulticast(Bootstrap sb, Bootstrap cb) throws Throwable {
57 NetworkInterface iface = multicastNetworkInterface();
58 assumeTrue(iface != null, "No NetworkInterface found that supports multicast and " +
59 socketProtocolFamily());
60
61 MulticastTestHandler mhandler = new MulticastTestHandler();
62
63 sb.handler(new SimpleChannelInboundHandler<>() {
64 @Override
65 public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
66
67 }
68 });
69
70 cb.handler(mhandler);
71
72 sb.option(ChannelOption.IP_MULTICAST_IF, iface);
73 sb.option(ChannelOption.SO_REUSEADDR, true);
74
75 cb.option(ChannelOption.IP_MULTICAST_IF, iface);
76 cb.option(ChannelOption.SO_REUSEADDR, true);
77
78 DatagramChannel sc = (DatagramChannel) sb.bind(newSocketAddress(iface)).asStage().get();
79 assertEquals(iface, sc.getOption(ChannelOption.IP_MULTICAST_IF));
80
81 InetSocketAddress addr = (InetSocketAddress) sc.localAddress();
82 cb.localAddress(addr.getPort());
83
84 DatagramChannel cc = (DatagramChannel) cb.bind().asStage().get();
85 assertEquals(iface, cc.getOption(ChannelOption.IP_MULTICAST_IF));
86
87 InetAddress groupAddress = SocketUtils.addressByName(groupAddress());
88 cc.joinGroup(groupAddress, iface, null).asStage().sync();
89
90 InetSocketAddress destAddress = new InetSocketAddress(groupAddress, addr.getPort());
91
92 BufferAllocator allocator = sc.bufferAllocator();
93 sc.writeAndFlush(new DatagramPacket(allocator.allocate(4).writeInt(1), destAddress)).asStage().sync();
94 assertTrue(mhandler.await());
95
96
97 cc.leaveGroup(groupAddress, iface, null).asStage().sync();
98
99
100 Thread.sleep(1000);
101
102
103 sc.writeAndFlush(new DatagramPacket(allocator.allocate(4).writeInt(1), destAddress)).asStage().sync();
104 mhandler.await();
105
106 cc.setOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED, false);
107 sc.setOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED, false);
108
109 assertFalse(cc.getOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED));
110 assertFalse(sc.getOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED));
111
112 cc.setOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED, true);
113 sc.setOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED, true);
114
115 assertTrue(cc.getOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED));
116 assertTrue(sc.getOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED));
117
118 sc.close().asStage().await();
119 cc.close().asStage().await();
120 }
121
122 private static final class MulticastTestHandler extends SimpleChannelInboundHandler<DatagramPacket> {
123 private final CountDownLatch latch = new CountDownLatch(1);
124
125 private boolean done;
126 private volatile boolean fail;
127 private volatile Throwable error;
128
129 @Override
130 protected void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
131 if (done) {
132 fail = true;
133 }
134
135 try {
136 assertEquals(1, msg.content().readInt());
137 } catch (Throwable e) {
138 error = e;
139 }
140
141 latch.countDown();
142
143
144 done = true;
145 }
146
147 public boolean await() throws Exception {
148 boolean success = latch.await(10, TimeUnit.SECONDS);
149 Throwable error = this.error;
150 if (error != null) {
151 throw new Exception("Exception thrown in messageReceived", error);
152 }
153 if (fail) {
154
155 fail();
156 }
157 return success;
158 }
159 }
160
161 @Override
162 protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
163 return SocketTestPermutation.INSTANCE.datagram(socketProtocolFamily());
164 }
165
166 private InetSocketAddress newAnySocketAddress() throws UnknownHostException {
167 SocketProtocolFamily family = SocketProtocolFamily.of(socketProtocolFamily());
168 switch (family) {
169 case INET:
170 return new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
171 case INET6:
172 return new InetSocketAddress(InetAddress.getByName("::"), 0);
173 default:
174 throw new UnsupportedOperationException("Any address not supported: " + family);
175 }
176 }
177
178 private InetSocketAddress newSocketAddress(NetworkInterface iface) {
179 Enumeration<InetAddress> addresses = iface.getInetAddresses();
180 while (addresses.hasMoreElements()) {
181 InetAddress address = addresses.nextElement();
182 if (isFamilySupported(address, socketProtocolFamily())) {
183 return new InetSocketAddress(address, 0);
184 }
185 }
186 throw new AssertionError();
187 }
188
189 private NetworkInterface multicastNetworkInterface() throws IOException {
190 Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
191 while (interfaces.hasMoreElements()) {
192 NetworkInterface iface = interfaces.nextElement();
193 if (iface.isUp() && iface.supportsMulticast()) {
194 Enumeration<InetAddress> addresses = iface.getInetAddresses();
195 while (addresses.hasMoreElements()) {
196 InetAddress address = addresses.nextElement();
197 if (isFamilySupported(address, protocolFamily())) {
198 try (MulticastSocket socket = new MulticastSocket(newAnySocketAddress())) {
199 socket.setReuseAddress(true);
200 socket.setNetworkInterface(iface);
201 socket.send(new java.net.DatagramPacket(new byte[]{1, 2, 3, 4}, 4,
202 new InetSocketAddress(groupAddress(), 12345)));
203 return iface;
204 } catch (IOException ignore) {
205
206 }
207 }
208 }
209 }
210 }
211 return null;
212 }
213
214 private String groupAddress() {
215 switch (groupProtocolFamily()) {
216 case INET:
217 return "230.0.0.1";
218 case INET6:
219 return "FF01:0:0:0:0:0:0:101";
220 default:
221 throw new UnsupportedOperationException();
222 }
223 }
224 }