View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.socket.nio;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.channel.ChannelShutdownDirection;
20  import io.netty5.channel.FixedRecvBufferAllocator;
21  import io.netty5.util.Resource;
22  import io.netty5.buffer.api.WritableComponent;
23  import io.netty5.buffer.api.WritableComponentProcessor;
24  import io.netty5.channel.AddressedEnvelope;
25  import io.netty5.channel.Channel;
26  import io.netty5.channel.ChannelException;
27  import io.netty5.channel.ChannelMetadata;
28  import io.netty5.channel.ChannelOption;
29  import io.netty5.channel.ChannelOutboundBuffer;
30  import io.netty5.channel.DefaultBufferAddressedEnvelope;
31  import io.netty5.channel.EventLoop;
32  import io.netty5.channel.RecvBufferAllocator;
33  import io.netty5.channel.RecvBufferAllocator.Handle;
34  import io.netty5.channel.nio.AbstractNioMessageChannel;
35  import io.netty5.channel.socket.DatagramPacket;
36  import io.netty5.util.concurrent.Future;
37  import io.netty5.util.internal.PlatformDependent;
38  import io.netty5.util.internal.SocketUtils;
39  import io.netty5.util.internal.StringUtil;
40  import io.netty5.util.internal.logging.InternalLogger;
41  import io.netty5.util.internal.logging.InternalLoggerFactory;
42  
43  import java.io.IOException;
44  import java.net.InetAddress;
45  import java.net.InetSocketAddress;
46  import java.net.NetworkInterface;
47  import java.net.ProtocolFamily;
48  import java.net.SocketAddress;
49  import java.net.SocketException;
50  import java.net.SocketOption;
51  import java.net.StandardSocketOptions;
52  import java.nio.ByteBuffer;
53  import java.nio.channels.DatagramChannel;
54  import java.nio.channels.MembershipKey;
55  import java.nio.channels.SelectionKey;
56  import java.nio.channels.spi.SelectorProvider;
57  import java.util.ArrayList;
58  import java.util.HashMap;
59  import java.util.Iterator;
60  import java.util.List;
61  import java.util.Map;
62  import java.util.function.Predicate;
63  
64  import static io.netty5.channel.ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION;
65  
66  import static io.netty5.channel.socket.nio.NioChannelUtil.isDomainSocket;
67  import static io.netty5.channel.socket.nio.NioChannelUtil.toDomainSocketAddress;
68  import static io.netty5.channel.socket.nio.NioChannelUtil.toJdkFamily;
69  import static io.netty5.channel.socket.nio.NioChannelUtil.toUnixDomainSocketAddress;
70  import static java.util.Objects.requireNonNull;
71  
72  /**
73   * An NIO {@link io.netty5.channel.socket.DatagramChannel} that sends and receives an
74   * {@link AddressedEnvelope AddressedEnvelope<ByteBuf, SocketAddress>}.
75   *
76   * @see AddressedEnvelope
77   * @see DatagramPacket
78   *
79   *
80   * <h3>Available options</h3>
81   *
82   * In addition to the options provided by {@link io.netty5.channel.socket.DatagramChannel},
83   * {@link NioDatagramChannel} allows the following options in the option map:
84   *
85   * <table border="1" cellspacing="0" cellpadding="6">
86   * <tr>
87   * <th>{@link ChannelOption}</th>
88   * <th>{@code INET}</th>
89   * <th>{@code INET6}</th>
90   * <th>{@code UNIX</th>
91   * </tr><tr>
92   * <td>{@link NioChannelOption}</td><td>X</td><td>X</td><td>X</td>
93   * </tr>
94   * </table>
95   */
96  public final class NioDatagramChannel
97          extends AbstractNioMessageChannel<Channel, SocketAddress, SocketAddress>
98          implements io.netty5.channel.socket.DatagramChannel {
99      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioDatagramChannel.class);
100 
101     private static final ChannelMetadata METADATA = new ChannelMetadata(true);
102     private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
103     private static final String EXPECTED_TYPES =
104             " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
105             StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
106             StringUtil.simpleClassName(Buffer.class) + ", " +
107             StringUtil.simpleClassName(SocketAddress.class) + ">, " +
108             StringUtil.simpleClassName(Buffer.class) + ')';
109 
110     private static final Predicate<Handle> TRUE_SUPPLIER = h -> true;
111 
112     private final ProtocolFamily family;
113 
114     private volatile boolean inputShutdown;
115     private volatile boolean outputShutdown;
116 
117     private Map<InetAddress, List<MembershipKey>> memberships;
118 
119     private volatile boolean activeOnOpen;
120     private volatile boolean bound;
121 
122     private static DatagramChannel newSocket(SelectorProvider provider) {
123         try {
124              // Use the SelectorProvider to open SocketChannel and so remove condition in
125              // SelectorProvider#provider() which is called by each DatagramChannel.open() otherwise.
126              // See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
127             return provider.openDatagramChannel();
128         } catch (IOException e) {
129             throw new ChannelException("Failed to open a socket.", e);
130         }
131     }
132 
133     private static DatagramChannel newSocket(SelectorProvider provider, ProtocolFamily family) {
134         if (family == null) {
135             return newSocket(provider);
136         }
137         try {
138             return provider.openDatagramChannel(family);
139         } catch (IOException e) {
140             throw new ChannelException("Failed to open a socket.", e);
141         }
142     }
143 
144     /**
145      * Create a new instance which will use the Operation Systems default {@link ProtocolFamily}.
146      */
147     public NioDatagramChannel(EventLoop eventLoop) {
148         this(eventLoop, newSocket(DEFAULT_SELECTOR_PROVIDER), null);
149     }
150 
151     /**
152      * Create a new instance using the given {@link SelectorProvider}
153      * which will use the Operation Systems default {@link ProtocolFamily}.
154      */
155     public NioDatagramChannel(EventLoop eventLoop, SelectorProvider provider) {
156         this(eventLoop, newSocket(provider), null);
157     }
158 
159     /**
160      * Create a new instance using the given {@link ProtocolFamily}. If {@code null} is used it will depend
161      * on the Operation Systems default which will be chosen.
162      */
163     public NioDatagramChannel(EventLoop eventLoop, ProtocolFamily family) {
164         this(eventLoop, DEFAULT_SELECTOR_PROVIDER, family);
165     }
166 
167     /**
168      * Create a new instance using the given {@link SelectorProvider} and {@link ProtocolFamily}.
169      * If {@link ProtocolFamily} is {@code null} it will depend on the Operation Systems default
170      * which will be chosen.
171      */
172     public NioDatagramChannel(EventLoop eventLoop, SelectorProvider provider, ProtocolFamily family) {
173         this(eventLoop, newSocket(provider, toJdkFamily(family)), family);
174     }
175 
176     /**
177      * Create a new instance from the given {@link DatagramChannel}.
178      */
179     public NioDatagramChannel(EventLoop eventLoop, DatagramChannel socket, ProtocolFamily family) {
180         super(null, eventLoop, METADATA, new FixedRecvBufferAllocator(2048), socket, SelectionKey.OP_READ);
181         this.family = toJdkFamily(family);
182     }
183 
184     @SuppressWarnings("unchecked")
185     @Override
186     protected <T> T getExtendedOption(ChannelOption<T> option) {
187         if (option == DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
188             return (T) Boolean.valueOf(isActiveOnOpen());
189         }
190         SocketOption<T> socketOption = NioChannelOption.toSocketOption(option);
191         if (socketOption != null) {
192             return NioChannelOption.getOption(javaChannel(), socketOption);
193         }
194         return super.getExtendedOption(option);
195     }
196 
197     @Override
198     protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
199         if (option == DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
200             setActiveOnOpen((Boolean) value);
201         } else {
202             SocketOption<T> socketOption = NioChannelOption.toSocketOption(option);
203             if (socketOption != null) {
204                 try {
205                     // See: https://github.com/netty/netty/issues/576
206                     if (socketOption == StandardSocketOptions.SO_BROADCAST &&
207                             !isAnyLocalAddress() &&
208                             !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
209                         // Warn a user about the fact that a non-root user can't receive a
210                         // broadcast packet on *nix if the socket is bound on non-wildcard address.
211                         logger.warn(
212                                 "A non-root user can't receive a broadcast packet if the socket " +
213                                         "is not bound to a wildcard address; setting the SO_BROADCAST flag " +
214                                         "anyway as requested on the socket which is bound to " +
215                                         javaChannel().getLocalAddress() + '.');
216                     }
217                     NioChannelOption.setOption(javaChannel(), socketOption, value);
218                 } catch (IOException e) {
219                     throw new ChannelException(e);
220                 }
221             } else {
222                 super.setExtendedOption(option, value);
223             }
224         }
225     }
226 
227     @Override
228     protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
229         if (option == DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
230             return true;
231         }
232         SocketOption<?> socketOption = NioChannelOption.toSocketOption(option);
233         if (socketOption != null) {
234             return NioChannelOption.isOptionSupported(javaChannel(), socketOption);
235         }
236         return super.isExtendedOptionSupported(option);
237     }
238 
239     private boolean isActiveOnOpen() {
240         return activeOnOpen;
241     }
242 
243     private void setActiveOnOpen(boolean activeOnOpen) {
244         if (isRegistered()) {
245             throw new IllegalStateException("Can only changed before channel was registered");
246         }
247         this.activeOnOpen = activeOnOpen;
248     }
249 
250     private boolean isAnyLocalAddress() throws IOException {
251         SocketAddress address = javaChannel().getLocalAddress();
252         return address instanceof InetSocketAddress && ((InetSocketAddress) address).getAddress().isAnyLocalAddress();
253     }
254 
255     private NetworkInterface getNetworkInterface() {
256         try {
257             return javaChannel().getOption(StandardSocketOptions.IP_MULTICAST_IF);
258         } catch (IOException e) {
259             throw new ChannelException(e);
260         }
261     }
262 
263     @Override
264     protected void doShutdown(ChannelShutdownDirection direction) {
265         switch (direction) {
266             case Inbound:
267                 inputShutdown = true;
268                 break;
269             case Outbound:
270                 outputShutdown = true;
271                 break;
272             default:
273                 throw new AssertionError();
274         }
275     }
276 
277     @Override
278     public boolean isShutdown(ChannelShutdownDirection direction) {
279         if (!isActive()) {
280             return true;
281         }
282         switch (direction) {
283             case Inbound:
284                 return inputShutdown;
285             case Outbound:
286                 return outputShutdown;
287             default:
288                 throw new AssertionError();
289         }
290     }
291 
292     @Override
293     @SuppressWarnings("deprecation")
294     public boolean isActive() {
295         DatagramChannel ch = javaChannel();
296         return ch.isOpen() && (getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
297                 || bound);
298     }
299 
300     @Override
301     public boolean isConnected() {
302         return javaChannel().isConnected();
303     }
304 
305     @Override
306     protected DatagramChannel javaChannel() {
307         return (DatagramChannel) super.javaChannel();
308     }
309 
310     @Override
311     protected SocketAddress localAddress0() {
312         try {
313             SocketAddress address = javaChannel().getLocalAddress();
314             if (isDomainSocket(family)) {
315                 return toDomainSocketAddress(address);
316             }
317             return address;
318         } catch (IOException e) {
319             // Just return null
320             return null;
321         }
322     }
323 
324     @Override
325     protected SocketAddress remoteAddress0() {
326         try {
327             SocketAddress address = javaChannel().getRemoteAddress();
328             if (isDomainSocket(family)) {
329                 return toDomainSocketAddress(address);
330             }
331             return address;
332         } catch (IOException e) {
333             // Just return null
334             return null;
335         }
336     }
337 
338     @Override
339     protected void doBind(SocketAddress localAddress) throws Exception {
340         doBind0(localAddress);
341     }
342 
343     private void doBind0(SocketAddress localAddress) throws Exception {
344         if (isDomainSocket(family)) {
345             localAddress = toUnixDomainSocketAddress(localAddress);
346         }
347         SocketUtils.bind(javaChannel(), localAddress);
348         bound = true;
349     }
350 
351     @Override
352     protected boolean doConnect(SocketAddress remoteAddress,
353             SocketAddress localAddress) throws Exception {
354         if (localAddress != null) {
355             doBind0(localAddress);
356         }
357 
358         boolean success = false;
359         try {
360             javaChannel().connect(remoteAddress);
361             // When connected we are also bound
362             bound = true;
363             success = true;
364             return true;
365         } finally {
366             if (!success) {
367                 doClose();
368             }
369         }
370     }
371 
372     @Override
373     protected boolean doFinishConnect(SocketAddress requestedRemoteAddress) {
374         return true;
375     }
376 
377     @Override
378     protected void doDisconnect() throws Exception {
379         javaChannel().disconnect();
380     }
381 
382     @Override
383     protected int doReadMessages(List<Object> buf) throws Exception {
384         RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
385 
386         return doReadBufferMessages(allocHandle, buf);
387     }
388 
389     private int doReadBufferMessages(Handle allocHandle, List<Object> buf) throws IOException {
390         Buffer data = allocHandle.allocate(bufferAllocator());
391         allocHandle.attemptedBytesRead(data.writableBytes());
392         boolean free = true;
393         try {
394             ReceiveDatagram receiveDatagram = new ReceiveDatagram(javaChannel());
395             data.forEachWritable(0, receiveDatagram);
396             SocketAddress remoteAddress = receiveDatagram.remoteAddress;
397             if (remoteAddress == null) {
398                 return 0;
399             }
400 
401             allocHandle.lastBytesRead(receiveDatagram.bytesReceived);
402             data.skipWritableBytes(allocHandle.lastBytesRead());
403             buf.add(new DatagramPacket(data, localAddress(), remoteAddress));
404             free = false;
405             return 1;
406         } finally {
407             if (free) {
408                 data.close();
409             }
410         }
411     }
412 
413     @Override
414     protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
415         final SocketAddress remoteAddress;
416         final Object data;
417         if (msg instanceof AddressedEnvelope) {
418             @SuppressWarnings("unchecked")
419             AddressedEnvelope<?, SocketAddress> envelope = (AddressedEnvelope<?, SocketAddress>) msg;
420             remoteAddress = envelope.recipient();
421             data = envelope.content();
422         } else {
423             data = msg;
424             remoteAddress = null;
425         }
426 
427         Buffer buf = (Buffer) data;
428         final int length = buf.readableBytes();
429         if (length == 0) {
430             return true;
431         }
432 
433         int initialReadable = buf.readableBytes();
434         buf.forEachReadable(0, (index, component) -> {
435             final int writtenBytes;
436             if (remoteAddress != null) {
437                 writtenBytes = javaChannel().send(component.readableBuffer(), remoteAddress);
438             } else {
439                 writtenBytes = javaChannel().write(component.readableBuffer());
440             }
441             component.skipReadableBytes(writtenBytes);
442             return true;
443         });
444         return buf.readableBytes() < initialReadable;
445     }
446 
447     @Override
448     protected Object filterOutboundMessage(Object msg) {
449         if (msg instanceof DatagramPacket) {
450             DatagramPacket p = (DatagramPacket) msg;
451             Buffer content = p.content();
452             if (isSingleDirectBuffer(content)) {
453                 return p;
454             }
455             return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
456         }
457 
458         if (msg instanceof Buffer) {
459             Buffer buf = (Buffer) msg;
460             if (isSingleDirectBuffer(buf)) {
461                 return buf;
462             }
463             return newDirectBuffer(buf);
464         }
465 
466         if (msg instanceof AddressedEnvelope) {
467             @SuppressWarnings("unchecked")
468             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
469             Object content = e.content();
470             if (content instanceof Buffer) {
471                 Buffer buf = (Buffer) content;
472                 if (isSingleDirectBuffer(buf)) {
473                     return e;
474                 }
475                 return new DefaultBufferAddressedEnvelope<>(newDirectBuffer((Resource<?>) e, buf), e.recipient());
476             }
477         }
478 
479         throw new UnsupportedOperationException(
480                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
481     }
482 
483     /**
484      * Checks if the specified buffer is a direct buffer and not composite.
485      * (We check this because otherwise we need to make it a non-composite buffer.)
486      */
487     private static boolean isSingleDirectBuffer(Buffer buf) {
488         return buf.isDirect() && buf.countComponents() == 1;
489     }
490 
491     @Override
492     protected boolean continueOnWriteError() {
493         // Continue on write error as a DatagramChannel can write to multiple remote peers
494         //
495         // See https://github.com/netty/netty/issues/2665
496         return true;
497     }
498 
499     private NetworkInterface networkInterface() throws SocketException {
500         NetworkInterface iface = getNetworkInterface();
501         if (iface == null) {
502             SocketAddress localAddress = localAddress();
503             if (localAddress instanceof InetSocketAddress) {
504                 return NetworkInterface.getByInetAddress(((InetSocketAddress) localAddress()).getAddress());
505             }
506             throw new UnsupportedOperationException();
507         }
508         return iface;
509     }
510 
511     @Override
512     public Future<Void> joinGroup(InetAddress multicastAddress) {
513         try {
514             return joinGroup(
515                     multicastAddress, networkInterface(), null);
516         } catch (SocketException | UnsupportedOperationException e) {
517             return newFailedFuture(e);
518         }
519     }
520 
521     @Override
522     public Future<Void> joinGroup(
523             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
524         requireNonNull(multicastAddress, "multicastAddress");
525         requireNonNull(networkInterface, "networkInterface");
526 
527         try {
528             MembershipKey key;
529             if (source == null) {
530                 key = javaChannel().join(multicastAddress, networkInterface);
531             } else {
532                 key = javaChannel().join(multicastAddress, networkInterface, source);
533             }
534 
535             synchronized (this) {
536                 List<MembershipKey> keys = null;
537                 if (memberships == null) {
538                     memberships = new HashMap<>();
539                 } else {
540                     keys = memberships.get(multicastAddress);
541                 }
542                 if (keys == null) {
543                     keys = new ArrayList<>();
544                     memberships.put(multicastAddress, keys);
545                 }
546                 keys.add(key);
547             }
548 
549             return newSucceededFuture();
550         } catch (Throwable e) {
551             return newFailedFuture(e);
552         }
553     }
554 
555     @Override
556     public Future<Void> leaveGroup(InetAddress multicastAddress) {
557         try {
558             return leaveGroup(
559                     multicastAddress, networkInterface(), null);
560         } catch (SocketException | UnsupportedOperationException e) {
561             return newFailedFuture(e);
562         }
563     }
564 
565     @Override
566     public Future<Void> leaveGroup(
567             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
568         requireNonNull(multicastAddress, "multicastAddress");
569         requireNonNull(networkInterface, "networkInterface");
570 
571         synchronized (this) {
572             if (memberships != null) {
573                 List<MembershipKey> keys = memberships.get(multicastAddress);
574                 if (keys != null) {
575                     Iterator<MembershipKey> keyIt = keys.iterator();
576 
577                     while (keyIt.hasNext()) {
578                         MembershipKey key = keyIt.next();
579                         if (networkInterface.equals(key.networkInterface())) {
580                            if (source == null && key.sourceAddress() == null ||
581                                source != null && source.equals(key.sourceAddress())) {
582                                key.drop();
583                                keyIt.remove();
584                            }
585                         }
586                     }
587                     if (keys.isEmpty()) {
588                         memberships.remove(multicastAddress);
589                     }
590                 }
591             }
592         }
593         return newSucceededFuture();
594     }
595 
596     /**
597      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
598      */
599     @Override
600     public Future<Void> block(
601             InetAddress multicastAddress, NetworkInterface networkInterface,
602             InetAddress sourceToBlock) {
603         requireNonNull(multicastAddress, "multicastAddress");
604         requireNonNull(sourceToBlock, "sourceToBlock");
605         requireNonNull(networkInterface, "networkInterface");
606 
607         synchronized (this) {
608             if (memberships != null) {
609                 List<MembershipKey> keys = memberships.get(multicastAddress);
610                 for (MembershipKey key: keys) {
611                     if (networkInterface.equals(key.networkInterface())) {
612                         try {
613                             key.block(sourceToBlock);
614                         } catch (IOException e) {
615                             return newFailedFuture(e);
616                         }
617                     }
618                 }
619             }
620         }
621         return newSucceededFuture();
622     }
623 
624     /**
625      * Block the given sourceToBlock address for the given multicastAddress
626      */
627     @Override
628     public Future<Void> block(InetAddress multicastAddress, InetAddress sourceToBlock) {
629         try {
630             return block(
631                     multicastAddress, networkInterface(),
632                     sourceToBlock);
633         } catch (SocketException | UnsupportedOperationException e) {
634             return newFailedFuture(e);
635         }
636     }
637 
638     void clearReadPending0() {
639         clearReadPending();
640     }
641 
642     @Override
643     protected boolean closeOnReadError(Throwable cause) {
644         // We do not want to close on SocketException when using DatagramChannel as we usually can continue receiving.
645         // See https://github.com/netty/netty/issues/5893
646         if (cause instanceof SocketException) {
647             return false;
648         }
649         return super.closeOnReadError(cause);
650     }
651 
652     @Override
653     protected boolean continueReading(RecvBufferAllocator.Handle allocHandle) {
654         // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
655         // as we read anything).
656         return allocHandle.continueReading(isAutoRead(), TRUE_SUPPLIER);
657     }
658 
659     private static final class ReceiveDatagram implements WritableComponentProcessor<IOException> {
660         private final DatagramChannel channel;
661         private SocketAddress remoteAddress;
662         private int bytesReceived;
663 
664         ReceiveDatagram(DatagramChannel channel) {
665             this.channel = channel;
666         }
667 
668         @Override
669         public boolean process(int index, WritableComponent component) throws IOException {
670             ByteBuffer dst = component.writableBuffer();
671             int position = dst.position();
672             remoteAddress =  channel.receive(dst);
673             bytesReceived = dst.position() - position;
674             return false;
675         }
676     }
677 }