View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.testsuite.transport.socket;
17  
18  import io.netty5.bootstrap.Bootstrap;
19  import io.netty5.buffer.api.BufferAllocator;
20  import io.netty5.channel.ChannelHandlerContext;
21  import io.netty5.channel.ChannelOption;
22  import io.netty5.channel.SimpleChannelInboundHandler;
23  import io.netty5.channel.socket.DatagramPacket;
24  import io.netty5.channel.socket.DatagramChannel;
25  import io.netty5.channel.socket.SocketProtocolFamily;
26  import io.netty5.testsuite.transport.TestsuitePermutation;
27  import io.netty5.util.internal.SocketUtils;
28  import org.junit.jupiter.api.Test;
29  import org.junit.jupiter.api.TestInfo;
30  
31  import java.io.IOException;
32  import java.net.InetAddress;
33  import java.net.InetSocketAddress;
34  import java.net.MulticastSocket;
35  import java.net.NetworkInterface;
36  import java.net.UnknownHostException;
37  import java.util.Enumeration;
38  import java.util.List;
39  import java.util.concurrent.CountDownLatch;
40  import java.util.concurrent.TimeUnit;
41  
42  import static io.netty5.util.NetUtil.isFamilySupported;
43  
44  import static org.junit.jupiter.api.Assertions.assertEquals;
45  import static org.junit.jupiter.api.Assertions.assertFalse;
46  import static org.junit.jupiter.api.Assertions.assertTrue;
47  import static org.junit.jupiter.api.Assertions.fail;
48  import static org.junit.jupiter.api.Assumptions.assumeTrue;
49  
50  public class DatagramMulticastTest extends AbstractDatagramTest {
51      @Test
52      public void testMulticast(TestInfo testInfo) throws Throwable {
53          run(testInfo, this::testMulticast);
54      }
55  
56      public void testMulticast(Bootstrap sb, Bootstrap cb) throws Throwable {
57          NetworkInterface iface = multicastNetworkInterface();
58          assumeTrue(iface != null, "No NetworkInterface found that supports multicast and " +
59                               socketProtocolFamily());
60  
61          MulticastTestHandler mhandler = new MulticastTestHandler();
62  
63          sb.handler(new SimpleChannelInboundHandler<>() {
64              @Override
65              public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
66                  // Nothing will be sent.
67              }
68          });
69  
70          cb.handler(mhandler);
71  
72          sb.option(ChannelOption.IP_MULTICAST_IF, iface);
73          sb.option(ChannelOption.SO_REUSEADDR, true);
74  
75          cb.option(ChannelOption.IP_MULTICAST_IF, iface);
76          cb.option(ChannelOption.SO_REUSEADDR, true);
77  
78          DatagramChannel sc = (DatagramChannel) sb.bind(newSocketAddress(iface)).asStage().get();
79          assertEquals(iface, sc.getOption(ChannelOption.IP_MULTICAST_IF));
80  
81          InetSocketAddress addr = (InetSocketAddress) sc.localAddress();
82          cb.localAddress(addr.getPort());
83  
84          DatagramChannel cc = (DatagramChannel) cb.bind().asStage().get();
85          assertEquals(iface, cc.getOption(ChannelOption.IP_MULTICAST_IF));
86  
87          InetAddress groupAddress = SocketUtils.addressByName(groupAddress());
88          cc.joinGroup(groupAddress, iface, null).asStage().sync();
89  
90          InetSocketAddress destAddress = new InetSocketAddress(groupAddress, addr.getPort());
91  
92                  BufferAllocator allocator = sc.bufferAllocator();
93          sc.writeAndFlush(new DatagramPacket(allocator.allocate(4).writeInt(1), destAddress)).asStage().sync();
94          assertTrue(mhandler.await());
95  
96          // leave the group
97          cc.leaveGroup(groupAddress, iface, null).asStage().sync();
98  
99          // sleep a second to make sure we left the group
100         Thread.sleep(1000);
101 
102         // we should not receive a message anymore as we left the group before
103         sc.writeAndFlush(new DatagramPacket(allocator.allocate(4).writeInt(1), destAddress)).asStage().sync();
104         mhandler.await();
105 
106         cc.setOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED, false);
107         sc.setOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED, false);
108 
109         assertFalse(cc.getOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED));
110         assertFalse(sc.getOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED));
111 
112         cc.setOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED, true);
113         sc.setOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED, true);
114 
115         assertTrue(cc.getOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED));
116         assertTrue(sc.getOption(ChannelOption.IP_MULTICAST_LOOP_DISABLED));
117 
118         sc.close().asStage().await();
119         cc.close().asStage().await();
120     }
121 
122     private static final class MulticastTestHandler extends SimpleChannelInboundHandler<DatagramPacket> {
123         private final CountDownLatch latch = new CountDownLatch(1);
124 
125         private boolean done;
126         private volatile boolean fail;
127         private volatile Throwable error;
128 
129         @Override
130         protected void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
131             if (done) {
132                 fail = true;
133             }
134 
135             try {
136                 assertEquals(1, msg.content().readInt());
137             } catch (Throwable e) {
138                 error = e;
139             }
140 
141             latch.countDown();
142 
143             // mark the handler as done as we only are supposed to receive one message
144             done = true;
145         }
146 
147         public boolean await() throws Exception {
148             boolean success = latch.await(10, TimeUnit.SECONDS);
149             Throwable error = this.error;
150             if (error != null) {
151                 throw new Exception("Exception thrown in messageReceived", error);
152             }
153             if (fail) {
154                 // fail if we receive a message after we are done
155                 fail();
156             }
157             return success;
158         }
159     }
160 
161     @Override
162     protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
163         return SocketTestPermutation.INSTANCE.datagram(socketProtocolFamily());
164     }
165 
166     private InetSocketAddress newAnySocketAddress() throws UnknownHostException {
167         SocketProtocolFamily family = SocketProtocolFamily.of(socketProtocolFamily());
168         switch (family) {
169             case INET:
170                 return new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
171             case INET6:
172                 return new InetSocketAddress(InetAddress.getByName("::"), 0);
173             default:
174                 throw new UnsupportedOperationException("Any address not supported: " + family);
175         }
176     }
177 
178     private InetSocketAddress newSocketAddress(NetworkInterface iface) {
179         Enumeration<InetAddress> addresses = iface.getInetAddresses();
180         while (addresses.hasMoreElements()) {
181             InetAddress address = addresses.nextElement();
182             if (isFamilySupported(address, socketProtocolFamily())) {
183                 return new InetSocketAddress(address, 0);
184             }
185         }
186         throw new AssertionError();
187     }
188 
189     private NetworkInterface multicastNetworkInterface() throws IOException {
190         Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
191         while (interfaces.hasMoreElements()) {
192             NetworkInterface iface = interfaces.nextElement();
193             if (iface.isUp() && iface.supportsMulticast()) {
194                 Enumeration<InetAddress> addresses = iface.getInetAddresses();
195                 while (addresses.hasMoreElements()) {
196                     InetAddress address = addresses.nextElement();
197                     if (isFamilySupported(address, protocolFamily())) {
198                         try (MulticastSocket socket = new MulticastSocket(newAnySocketAddress())) {
199                             socket.setReuseAddress(true);
200                             socket.setNetworkInterface(iface);
201                             socket.send(new java.net.DatagramPacket(new byte[]{1, 2, 3, 4}, 4,
202                                     new InetSocketAddress(groupAddress(), 12345)));
203                             return iface;
204                         } catch (IOException ignore) {
205                             // Try the next interface
206                         }
207                     }
208                 }
209             }
210         }
211         return null;
212     }
213 
214     private String groupAddress() {
215         switch (groupProtocolFamily()) {
216             case INET:
217                 return "230.0.0.1";
218             case INET6:
219                 return "FF01:0:0:0:0:0:0:101";
220             default:
221                 throw new UnsupportedOperationException();
222         }
223     }
224 }