View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.socket.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelException;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOption;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.channel.DefaultAddressedEnvelope;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.nio.AbstractNioMessageChannel;
30  import io.netty.channel.socket.DatagramChannelConfig;
31  import io.netty.channel.socket.DatagramPacket;
32  import io.netty.channel.socket.InternetProtocolFamily;
33  import io.netty.channel.socket.SocketProtocolFamily;
34  import io.netty.util.UncheckedBooleanSupplier;
35  import io.netty.util.internal.ObjectUtil;
36  import io.netty.util.internal.PlatformDependent;
37  import io.netty.util.internal.SocketUtils;
38  import io.netty.util.internal.StringUtil;
39  
40  import java.io.IOException;
41  import java.net.InetAddress;
42  import java.net.InetSocketAddress;
43  import java.net.NetworkInterface;
44  import java.net.SocketAddress;
45  import java.net.SocketException;
46  import java.nio.ByteBuffer;
47  import java.nio.channels.DatagramChannel;
48  import java.nio.channels.MembershipKey;
49  import java.nio.channels.SelectionKey;
50  import java.nio.channels.UnresolvedAddressException;
51  import java.nio.channels.spi.SelectorProvider;
52  import java.util.ArrayList;
53  import java.util.HashMap;
54  import java.util.Iterator;
55  import java.util.List;
56  import java.util.Map;
57  
58  /**
59   * An NIO datagram {@link Channel} that sends and receives an
60   * {@link AddressedEnvelope AddressedEnvelope<ByteBuf, SocketAddress>}.
61   *
62   * @see AddressedEnvelope
63   * @see DatagramPacket
64   */
65  public final class NioDatagramChannel
66          extends AbstractNioMessageChannel implements io.netty.channel.socket.DatagramChannel {
67  
68      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
69      private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
70      private static final String EXPECTED_TYPES =
71              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
72              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
73              StringUtil.simpleClassName(ByteBuf.class) + ", " +
74              StringUtil.simpleClassName(SocketAddress.class) + ">, " +
75              StringUtil.simpleClassName(ByteBuf.class) + ')';
76  
77      private final DatagramChannelConfig config;
78  
79      private Map<InetAddress, List<MembershipKey>> memberships;
80  
81      /**
82       *  Use the {@link SelectorProvider} to open {@link DatagramChannel} and so remove condition in
83       *  {@link SelectorProvider#provider()} which is called by each DatagramChannel.open() otherwise.
84       * <p>
85       *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
86       */
87      private static DatagramChannel newSocket(SelectorProvider provider) {
88          try {
89              return provider.openDatagramChannel();
90          } catch (IOException e) {
91              throw new ChannelException("Failed to open a socket.", e);
92          }
93      }
94  
95      private static DatagramChannel newSocket(SelectorProvider provider, SocketProtocolFamily ipFamily) {
96          if (ipFamily == null) {
97              return newSocket(provider);
98          }
99  
100         try {
101             return provider.openDatagramChannel(ipFamily.toJdkFamily());
102         } catch (IOException e) {
103             throw new ChannelException("Failed to open a socket.", e);
104         }
105     }
106 
107     /**
108      * Create a new instance which will use the Operation Systems default {@link SocketProtocolFamily}.
109      */
110     public NioDatagramChannel() {
111         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
112     }
113 
114     /**
115      * Create a new instance using the given {@link SelectorProvider}
116      * which will use the Operation Systems default {@link SocketProtocolFamily}.
117      */
118     public NioDatagramChannel(SelectorProvider provider) {
119         this(newSocket(provider));
120     }
121 
122     /**
123      * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
124      * on the Operation Systems default which will be chosen.
125      *
126      * @deprecated use {@link NioDatagramChannel#NioDatagramChannel(SocketProtocolFamily)}
127      */
128     @Deprecated
129     public NioDatagramChannel(InternetProtocolFamily ipFamily) {
130         this(ipFamily == null ? null : ipFamily.toSocketProtocolFamily());
131     }
132 
133     /**
134      * Create a new instance using the given {@link SocketProtocolFamily}. If {@code null} is used it will depend
135      * on the Operation Systems default which will be chosen.
136      */
137     public NioDatagramChannel(SocketProtocolFamily protocolFamily) {
138         this(newSocket(DEFAULT_SELECTOR_PROVIDER, protocolFamily));
139     }
140 
141     /**
142      * Create a new instance using the given {@link SelectorProvider} and {@link InternetProtocolFamily}.
143      * If {@link InternetProtocolFamily} is {@code null} it will depend on the Operation Systems default
144      * which will be chosen.
145      *
146      * @deprecated use {@link NioDatagramChannel#NioDatagramChannel(SelectorProvider, SocketProtocolFamily)}
147      */
148     @Deprecated
149     public NioDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) {
150         this(provider, ipFamily == null ? null : ipFamily.toSocketProtocolFamily());
151     }
152 
153     /**
154      * Create a new instance using the given {@link SelectorProvider} and {@link SocketProtocolFamily}.
155      * If {@link SocketProtocolFamily} is {@code null} it will depend on the Operation Systems default
156      * which will be chosen.
157      */
158     public NioDatagramChannel(SelectorProvider provider, SocketProtocolFamily protocolFamily) {
159         this(newSocket(provider, protocolFamily));
160     }
161 
162     /**
163      * Create a new instance from the given {@link DatagramChannel}.
164      */
165     public NioDatagramChannel(DatagramChannel socket) {
166         super(null, socket, SelectionKey.OP_READ);
167         config = new NioDatagramChannelConfig(this, socket);
168     }
169 
170     @Override
171     public ChannelMetadata metadata() {
172         return METADATA;
173     }
174 
175     @Override
176     public DatagramChannelConfig config() {
177         return config;
178     }
179 
180     @Override
181     @SuppressWarnings("deprecation")
182     public boolean isActive() {
183         DatagramChannel ch = javaChannel();
184         return ch.isOpen() && (
185                 config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
186                 || ch.socket().isBound());
187     }
188 
189     @Override
190     public boolean isConnected() {
191         return javaChannel().isConnected();
192     }
193 
194     @Override
195     protected DatagramChannel javaChannel() {
196         return (DatagramChannel) super.javaChannel();
197     }
198 
199     @Override
200     protected SocketAddress localAddress0() {
201         return javaChannel().socket().getLocalSocketAddress();
202     }
203 
204     @Override
205     protected SocketAddress remoteAddress0() {
206         return javaChannel().socket().getRemoteSocketAddress();
207     }
208 
209     @Override
210     protected void doBind(SocketAddress localAddress) throws Exception {
211         doBind0(localAddress);
212     }
213 
214     private void doBind0(SocketAddress localAddress) throws Exception {
215         SocketUtils.bind(javaChannel(), localAddress);
216     }
217 
218     @Override
219     protected boolean doConnect(SocketAddress remoteAddress,
220             SocketAddress localAddress) throws Exception {
221         if (localAddress != null) {
222             doBind0(localAddress);
223         }
224 
225         boolean success = false;
226         try {
227             javaChannel().connect(remoteAddress);
228             success = true;
229             return true;
230         } finally {
231             if (!success) {
232                 doClose();
233             }
234         }
235     }
236 
237     @Override
238     protected void doFinishConnect() throws Exception {
239         throw new Error();
240     }
241 
242     @Override
243     protected void doDisconnect() throws Exception {
244         javaChannel().disconnect();
245     }
246 
247     @Override
248     protected void doClose() throws Exception {
249         javaChannel().close();
250     }
251 
252     @Override
253     protected int doReadMessages(List<Object> buf) throws Exception {
254         DatagramChannel ch = javaChannel();
255         DatagramChannelConfig config = config();
256         RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
257 
258         ByteBuf data = allocHandle.allocate(config.getAllocator());
259         allocHandle.attemptedBytesRead(data.writableBytes());
260         boolean free = true;
261         try {
262             ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
263             int pos = nioData.position();
264             InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
265             if (remoteAddress == null) {
266                 return 0;
267             }
268 
269             allocHandle.lastBytesRead(nioData.position() - pos);
270             buf.add(new DatagramPacket(data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()),
271                     localAddress(), remoteAddress));
272             free = false;
273             return 1;
274         } catch (Throwable cause) {
275             PlatformDependent.throwException(cause);
276             return -1;
277         }  finally {
278             if (free) {
279                 data.release();
280             }
281         }
282     }
283 
284     @Override
285     protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
286         final SocketAddress remoteAddress;
287         final ByteBuf data;
288         if (msg instanceof AddressedEnvelope) {
289             @SuppressWarnings("unchecked")
290             AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
291             remoteAddress = envelope.recipient();
292             data = envelope.content();
293         } else {
294             data = (ByteBuf) msg;
295             remoteAddress = null;
296         }
297 
298         final int dataLen = data.readableBytes();
299         if (dataLen == 0) {
300             return true;
301         }
302 
303         final ByteBuffer nioData = data.nioBufferCount() == 1 ? data.internalNioBuffer(data.readerIndex(), dataLen)
304                                                               : data.nioBuffer(data.readerIndex(), dataLen);
305         final int writtenBytes;
306         if (remoteAddress != null) {
307             writtenBytes = javaChannel().send(nioData, remoteAddress);
308         } else {
309             writtenBytes = javaChannel().write(nioData);
310         }
311         return writtenBytes > 0;
312     }
313 
314     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
315         if (envelope.recipient() instanceof InetSocketAddress
316                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
317             throw new UnresolvedAddressException();
318         }
319     }
320 
321     @Override
322     protected Object filterOutboundMessage(Object msg) {
323         if (msg instanceof DatagramPacket) {
324             DatagramPacket p = (DatagramPacket) msg;
325             checkUnresolved(p);
326             ByteBuf content = p.content();
327             if (isSingleDirectBuffer(content)) {
328                 return p;
329             }
330             return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
331         }
332 
333         if (msg instanceof ByteBuf) {
334             ByteBuf buf = (ByteBuf) msg;
335             if (isSingleDirectBuffer(buf)) {
336                 return buf;
337             }
338             return newDirectBuffer(buf);
339         }
340 
341         if (msg instanceof AddressedEnvelope) {
342             @SuppressWarnings("unchecked")
343             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
344             checkUnresolved(e);
345             if (e.content() instanceof ByteBuf) {
346                 ByteBuf content = (ByteBuf) e.content();
347                 if (isSingleDirectBuffer(content)) {
348                     return e;
349                 }
350                 return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
351             }
352         }
353 
354         throw new UnsupportedOperationException(
355                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
356     }
357 
358     /**
359      * Checks if the specified buffer is a direct buffer and is composed of a single NIO buffer.
360      * (We check this because otherwise we need to make it a non-composite buffer.)
361      */
362     private static boolean isSingleDirectBuffer(ByteBuf buf) {
363         return buf.isDirect() && buf.nioBufferCount() == 1;
364     }
365 
366     @Override
367     protected boolean continueOnWriteError() {
368         // Continue on write error as a DatagramChannel can write to multiple remote peers
369         //
370         // See https://github.com/netty/netty/issues/2665
371         return true;
372     }
373 
374     @Override
375     public InetSocketAddress localAddress() {
376         return (InetSocketAddress) super.localAddress();
377     }
378 
379     @Override
380     public InetSocketAddress remoteAddress() {
381         return (InetSocketAddress) super.remoteAddress();
382     }
383 
384     @Override
385     public ChannelFuture joinGroup(InetAddress multicastAddress) {
386         return joinGroup(multicastAddress, newPromise());
387     }
388 
389     @Override
390     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
391         try {
392             NetworkInterface iface = config.getNetworkInterface();
393             if (iface == null) {
394                 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
395             }
396             return joinGroup(
397                     multicastAddress, iface, null, promise);
398         } catch (SocketException e) {
399             promise.setFailure(e);
400         }
401         return promise;
402     }
403 
404     @Override
405     public ChannelFuture joinGroup(
406             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
407         return joinGroup(multicastAddress, networkInterface, newPromise());
408     }
409 
410     @Override
411     public ChannelFuture joinGroup(
412             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
413             ChannelPromise promise) {
414         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
415     }
416 
417     @Override
418     public ChannelFuture joinGroup(
419             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
420         return joinGroup(multicastAddress, networkInterface, source, newPromise());
421     }
422 
423     @Override
424     public ChannelFuture joinGroup(
425             InetAddress multicastAddress, NetworkInterface networkInterface,
426             InetAddress source, ChannelPromise promise) {
427 
428         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
429         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
430 
431         try {
432             MembershipKey key;
433             if (source == null) {
434                 key = javaChannel().join(multicastAddress, networkInterface);
435             } else {
436                 key = javaChannel().join(multicastAddress, networkInterface, source);
437             }
438 
439             synchronized (this) {
440                 List<MembershipKey> keys = null;
441                 if (memberships == null) {
442                     memberships = new HashMap<InetAddress, List<MembershipKey>>();
443                 } else {
444                     keys = memberships.get(multicastAddress);
445                 }
446                 if (keys == null) {
447                     keys = new ArrayList<MembershipKey>();
448                     memberships.put(multicastAddress, keys);
449                 }
450                 keys.add(key);
451             }
452 
453             promise.setSuccess();
454         } catch (Throwable e) {
455             promise.setFailure(e);
456         }
457 
458         return promise;
459     }
460 
461     @Override
462     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
463         return leaveGroup(multicastAddress, newPromise());
464     }
465 
466     @Override
467     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
468         try {
469             return leaveGroup(
470                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
471         } catch (SocketException e) {
472             promise.setFailure(e);
473         }
474         return promise;
475     }
476 
477     @Override
478     public ChannelFuture leaveGroup(
479             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
480         return leaveGroup(multicastAddress, networkInterface, newPromise());
481     }
482 
483     @Override
484     public ChannelFuture leaveGroup(
485             InetSocketAddress multicastAddress,
486             NetworkInterface networkInterface, ChannelPromise promise) {
487         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
488     }
489 
490     @Override
491     public ChannelFuture leaveGroup(
492             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
493         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
494     }
495 
496     @Override
497     public ChannelFuture leaveGroup(
498             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
499             ChannelPromise promise) {
500 
501         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
502         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
503 
504         synchronized (this) {
505             if (memberships != null) {
506                 List<MembershipKey> keys = memberships.get(multicastAddress);
507                 if (keys != null) {
508                     Iterator<MembershipKey> keyIt = keys.iterator();
509 
510                     while (keyIt.hasNext()) {
511                         MembershipKey key = keyIt.next();
512                         if (networkInterface.equals(key.networkInterface())) {
513                            if (source == null && key.sourceAddress() == null ||
514                                source != null && source.equals(key.sourceAddress())) {
515                                key.drop();
516                                keyIt.remove();
517                            }
518                         }
519                     }
520                     if (keys.isEmpty()) {
521                         memberships.remove(multicastAddress);
522                     }
523                 }
524             }
525         }
526 
527         promise.setSuccess();
528         return promise;
529     }
530 
531     /**
532      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
533      */
534     @Override
535     public ChannelFuture block(
536             InetAddress multicastAddress, NetworkInterface networkInterface,
537             InetAddress sourceToBlock) {
538         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
539     }
540 
541     /**
542      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
543      */
544     @Override
545     public ChannelFuture block(
546             InetAddress multicastAddress, NetworkInterface networkInterface,
547             InetAddress sourceToBlock, ChannelPromise promise) {
548 
549         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
550         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
551         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
552 
553         synchronized (this) {
554             if (memberships != null) {
555                 List<MembershipKey> keys = memberships.get(multicastAddress);
556                 for (MembershipKey key: keys) {
557                     if (networkInterface.equals(key.networkInterface())) {
558                         try {
559                             key.block(sourceToBlock);
560                         } catch (IOException e) {
561                             promise.setFailure(e);
562                         }
563                     }
564                 }
565             }
566         }
567         promise.setSuccess();
568         return promise;
569     }
570 
571     /**
572      * Block the given sourceToBlock address for the given multicastAddress
573      *
574      */
575     @Override
576     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
577         return block(multicastAddress, sourceToBlock, newPromise());
578     }
579 
580     /**
581      * Block the given sourceToBlock address for the given multicastAddress
582      *
583      */
584     @Override
585     public ChannelFuture block(
586             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
587         try {
588             return block(
589                     multicastAddress,
590                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
591                     sourceToBlock, promise);
592         } catch (SocketException e) {
593             promise.setFailure(e);
594         }
595         return promise;
596     }
597 
598     @Override
599     @Deprecated
600     protected void setReadPending(boolean readPending) {
601         super.setReadPending(readPending);
602     }
603 
604     void clearReadPending0() {
605         clearReadPending();
606     }
607 
608     @Override
609     protected boolean closeOnReadError(Throwable cause) {
610         // We do not want to close on SocketException when using DatagramChannel as we usually can continue receiving.
611         // See https://github.com/netty/netty/issues/5893
612         if (cause instanceof SocketException) {
613             return false;
614         }
615         return super.closeOnReadError(cause);
616     }
617 
618     @Override
619     protected boolean continueReading(RecvByteBufAllocator.Handle allocHandle) {
620         if (allocHandle instanceof RecvByteBufAllocator.ExtendedHandle) {
621             // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
622             // as we read anything).
623             return ((RecvByteBufAllocator.ExtendedHandle) allocHandle)
624                     .continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER);
625         }
626         return allocHandle.continueReading();
627     }
628 }