1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelHandler;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelOption;
24 import io.netty.channel.SimpleChannelInboundHandler;
25 import io.netty.channel.socket.DatagramChannel;
26 import io.netty.channel.socket.DatagramPacket;
27 import io.netty.channel.socket.InternetProtocolFamily;
28 import io.netty.channel.socket.oio.OioDatagramChannel;
29 import io.netty.testsuite.transport.TestsuitePermutation;
30 import io.netty.util.NetUtil;
31 import io.netty.util.internal.SocketUtils;
32 import org.junit.jupiter.api.Test;
33 import org.junit.jupiter.api.TestInfo;
34
35 import java.io.IOException;
36 import java.net.InetAddress;
37 import java.net.InetSocketAddress;
38 import java.net.MulticastSocket;
39 import java.net.NetworkInterface;
40 import java.net.UnknownHostException;
41 import java.util.Enumeration;
42 import java.util.List;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
45
46 import static org.junit.jupiter.api.Assertions.assertEquals;
47 import static org.junit.jupiter.api.Assertions.assertFalse;
48 import static org.junit.jupiter.api.Assertions.assertTrue;
49 import static org.junit.jupiter.api.Assertions.fail;
50 import static org.junit.jupiter.api.Assumptions.assumeTrue;
51
52 public class DatagramMulticastTest extends AbstractDatagramTest {
53
54 @Test
55 public void testMulticast(TestInfo testInfo) throws Throwable {
56 run(testInfo, new Runner<Bootstrap, Bootstrap>() {
57 @Override
58 public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
59 testMulticast(bootstrap, bootstrap2);
60 }
61 });
62 }
63
64 public void testMulticast(Bootstrap sb, Bootstrap cb) throws Throwable {
65 NetworkInterface iface = multicastNetworkInterface();
66 assumeTrue(iface != null, "No NetworkInterface found that supports multicast and " +
67 socketInternetProtocalFamily());
68
69 MulticastTestHandler mhandler = new MulticastTestHandler();
70
71 sb.handler(new SimpleChannelInboundHandler<Object>() {
72 @Override
73 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
74
75 }
76
77 @Override
78 public boolean isSharable() {
79 return true;
80 }
81 });
82
83 cb.handler(mhandler);
84
85 sb.option(ChannelOption.IP_MULTICAST_IF, iface);
86 sb.option(ChannelOption.SO_REUSEADDR, true);
87
88 cb.option(ChannelOption.IP_MULTICAST_IF, iface);
89 cb.option(ChannelOption.SO_REUSEADDR, true);
90
91 DatagramChannel sc = null;
92
93 int attempts = 5;
94 ChannelFuture clientFuture;
95 do {
96 if (sc != null) {
97 sc.close().sync();
98 }
99 sc = (DatagramChannel) sb.bind(newSocketAddress(iface)).sync().channel();
100 if (sc instanceof OioDatagramChannel) {
101
102
103
104 sc.close().awaitUninterruptibly();
105 return;
106 }
107 assertEquals(iface, sc.config().getNetworkInterface());
108 assertInterfaceAddress(iface, sc.config().getInterface());
109
110 InetSocketAddress addr = sc.localAddress();
111 cb.localAddress(addr.getPort());
112 clientFuture = cb.bind().await();
113 } while (!clientFuture.isSuccess() && --attempts > 0);
114 DatagramChannel cc = (DatagramChannel) clientFuture.sync().channel();
115 assertEquals(iface, cc.config().getNetworkInterface());
116 assertInterfaceAddress(iface, cc.config().getInterface());
117
118 InetSocketAddress groupAddress = SocketUtils.socketAddress(groupAddress(), sc.localAddress().getPort());
119
120 cc.joinGroup(groupAddress, iface).sync();
121
122 sc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), groupAddress)).sync();
123 assertTrue(mhandler.await());
124
125
126 cc.leaveGroup(groupAddress, iface).sync();
127
128
129 Thread.sleep(1000);
130
131
132 sc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), groupAddress)).sync();
133 mhandler.await();
134
135 cc.config().setLoopbackModeDisabled(false);
136 sc.config().setLoopbackModeDisabled(false);
137
138 assertFalse(cc.config().isLoopbackModeDisabled());
139 assertFalse(sc.config().isLoopbackModeDisabled());
140
141 cc.config().setLoopbackModeDisabled(true);
142 sc.config().setLoopbackModeDisabled(true);
143
144 assertTrue(cc.config().isLoopbackModeDisabled());
145 assertTrue(sc.config().isLoopbackModeDisabled());
146
147 sc.close().awaitUninterruptibly();
148 cc.close().awaitUninterruptibly();
149 }
150
151 private static void assertInterfaceAddress(NetworkInterface networkInterface, InetAddress expected) {
152 Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
153 while (addresses.hasMoreElements()) {
154 if (expected.equals(addresses.nextElement())) {
155 return;
156 }
157 }
158 fail();
159 }
160
161 @ChannelHandler.Sharable
162 private static final class MulticastTestHandler extends SimpleChannelInboundHandler<DatagramPacket> {
163 private final CountDownLatch latch = new CountDownLatch(1);
164
165 private boolean done;
166 private volatile boolean fail;
167
168 @Override
169 protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
170 if (done) {
171 fail = true;
172 }
173
174 assertEquals(1, msg.content().readInt());
175
176 latch.countDown();
177
178
179 done = true;
180 }
181
182 public boolean await() throws Exception {
183 boolean success = latch.await(10, TimeUnit.SECONDS);
184 if (fail) {
185
186 fail();
187 }
188 return success;
189 }
190 }
191
192 @Override
193 protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
194 return SocketTestPermutation.INSTANCE.datagram(socketInternetProtocalFamily());
195 }
196
197 private InetSocketAddress newAnySocketAddress() throws UnknownHostException {
198 switch (socketInternetProtocalFamily()) {
199 case IPv4:
200 return new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
201 case IPv6:
202 return new InetSocketAddress(InetAddress.getByName("::"), 0);
203 default:
204 throw new AssertionError();
205 }
206 }
207
208 private InetSocketAddress newSocketAddress(NetworkInterface iface) {
209 Enumeration<InetAddress> addresses = iface.getInetAddresses();
210 while (addresses.hasMoreElements()) {
211 InetAddress address = addresses.nextElement();
212 if (socketInternetProtocalFamily().addressType().isAssignableFrom(address.getClass())) {
213 return new InetSocketAddress(address, 0);
214 }
215 }
216 throw new AssertionError();
217 }
218
219 private NetworkInterface multicastNetworkInterface() throws IOException {
220 for (NetworkInterface iface : NetUtil.NETWORK_INTERFACES) {
221 if (iface.isUp() && iface.supportsMulticast()) {
222 Enumeration<InetAddress> addresses = iface.getInetAddresses();
223 while (addresses.hasMoreElements()) {
224 InetAddress address = addresses.nextElement();
225 if (socketInternetProtocalFamily().addressType().isAssignableFrom(address.getClass())) {
226 MulticastSocket socket = new MulticastSocket(newAnySocketAddress());
227 socket.setReuseAddress(true);
228 socket.setNetworkInterface(iface);
229 try {
230 socket.send(new java.net.DatagramPacket(new byte[] { 1, 2, 3, 4 }, 4,
231 new InetSocketAddress(groupAddress(), 12345)));
232 return iface;
233 } catch (IOException ignore) {
234
235 } finally {
236 socket.close();
237 }
238 }
239 }
240 }
241 }
242 return null;
243 }
244
245 private String groupAddress() {
246 return groupInternetProtocalFamily() == InternetProtocolFamily.IPv4?
247 "230.0.0.1" : "FF01:0:0:0:0:0:0:101";
248 }
249 }