View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.socket.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelException;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOption;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.channel.DefaultAddressedEnvelope;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.nio.AbstractNioMessageChannel;
30  import io.netty.channel.socket.DatagramChannelConfig;
31  import io.netty.channel.socket.DatagramPacket;
32  import io.netty.channel.socket.InternetProtocolFamily;
33  import io.netty.util.UncheckedBooleanSupplier;
34  import io.netty.util.internal.ObjectUtil;
35  import io.netty.util.internal.SocketUtils;
36  import io.netty.util.internal.PlatformDependent;
37  import io.netty.util.internal.StringUtil;
38  import io.netty.util.internal.SuppressJava6Requirement;
39  
40  import java.io.IOException;
41  import java.net.InetAddress;
42  import java.net.InetSocketAddress;
43  import java.net.NetworkInterface;
44  import java.net.SocketAddress;
45  import java.net.SocketException;
46  import java.nio.ByteBuffer;
47  import java.nio.channels.DatagramChannel;
48  import java.nio.channels.MembershipKey;
49  import java.nio.channels.SelectionKey;
50  import java.nio.channels.spi.SelectorProvider;
51  import java.util.ArrayList;
52  import java.util.HashMap;
53  import java.util.Iterator;
54  import java.util.List;
55  import java.util.Map;
56  
57  /**
58   * An NIO datagram {@link Channel} that sends and receives an
59   * {@link AddressedEnvelope AddressedEnvelope<ByteBuf, SocketAddress>}.
60   *
61   * @see AddressedEnvelope
62   * @see DatagramPacket
63   */
64  public final class NioDatagramChannel
65          extends AbstractNioMessageChannel implements io.netty.channel.socket.DatagramChannel {
66  
67      private static final ChannelMetadata METADATA = new ChannelMetadata(true);
68      private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
69      private static final String EXPECTED_TYPES =
70              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
71              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
72              StringUtil.simpleClassName(ByteBuf.class) + ", " +
73              StringUtil.simpleClassName(SocketAddress.class) + ">, " +
74              StringUtil.simpleClassName(ByteBuf.class) + ')';
75  
76      private final DatagramChannelConfig config;
77  
78      private Map<InetAddress, List<MembershipKey>> memberships;
79  
80      private static DatagramChannel newSocket(SelectorProvider provider) {
81          try {
82              /**
83               *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
84               *  {@link SelectorProvider#provider()} which is called by each DatagramChannel.open() otherwise.
85               *
86               *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
87               */
88              return provider.openDatagramChannel();
89          } catch (IOException e) {
90              throw new ChannelException("Failed to open a socket.", e);
91          }
92      }
93  
94      @SuppressJava6Requirement(reason = "Usage guarded by java version check")
95      private static DatagramChannel newSocket(SelectorProvider provider, InternetProtocolFamily ipFamily) {
96          if (ipFamily == null) {
97              return newSocket(provider);
98          }
99  
100         checkJavaVersion();
101 
102         try {
103             return provider.openDatagramChannel(ProtocolFamilyConverter.convert(ipFamily));
104         } catch (IOException e) {
105             throw new ChannelException("Failed to open a socket.", e);
106         }
107     }
108 
109     private static void checkJavaVersion() {
110         if (PlatformDependent.javaVersion() < 7) {
111             throw new UnsupportedOperationException("Only supported on java 7+.");
112         }
113     }
114 
115     /**
116      * Create a new instance which will use the Operation Systems default {@link InternetProtocolFamily}.
117      */
118     public NioDatagramChannel() {
119         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
120     }
121 
122     /**
123      * Create a new instance using the given {@link SelectorProvider}
124      * which will use the Operation Systems default {@link InternetProtocolFamily}.
125      */
126     public NioDatagramChannel(SelectorProvider provider) {
127         this(newSocket(provider));
128     }
129 
130     /**
131      * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
132      * on the Operation Systems default which will be chosen.
133      */
134     public NioDatagramChannel(InternetProtocolFamily ipFamily) {
135         this(newSocket(DEFAULT_SELECTOR_PROVIDER, ipFamily));
136     }
137 
138     /**
139      * Create a new instance using the given {@link SelectorProvider} and {@link InternetProtocolFamily}.
140      * If {@link InternetProtocolFamily} is {@code null} it will depend on the Operation Systems default
141      * which will be chosen.
142      */
143     public NioDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) {
144         this(newSocket(provider, ipFamily));
145     }
146 
147     /**
148      * Create a new instance from the given {@link DatagramChannel}.
149      */
150     public NioDatagramChannel(DatagramChannel socket) {
151         super(null, socket, SelectionKey.OP_READ);
152         config = new NioDatagramChannelConfig(this, socket);
153     }
154 
155     @Override
156     public ChannelMetadata metadata() {
157         return METADATA;
158     }
159 
160     @Override
161     public DatagramChannelConfig config() {
162         return config;
163     }
164 
165     @Override
166     @SuppressWarnings("deprecation")
167     public boolean isActive() {
168         DatagramChannel ch = javaChannel();
169         return ch.isOpen() && (
170                 config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
171                 || ch.socket().isBound());
172     }
173 
174     @Override
175     public boolean isConnected() {
176         return javaChannel().isConnected();
177     }
178 
179     @Override
180     protected DatagramChannel javaChannel() {
181         return (DatagramChannel) super.javaChannel();
182     }
183 
184     @Override
185     protected SocketAddress localAddress0() {
186         return javaChannel().socket().getLocalSocketAddress();
187     }
188 
189     @Override
190     protected SocketAddress remoteAddress0() {
191         return javaChannel().socket().getRemoteSocketAddress();
192     }
193 
194     @Override
195     protected void doBind(SocketAddress localAddress) throws Exception {
196         doBind0(localAddress);
197     }
198 
199     private void doBind0(SocketAddress localAddress) throws Exception {
200         if (PlatformDependent.javaVersion() >= 7) {
201             SocketUtils.bind(javaChannel(), localAddress);
202         } else {
203             javaChannel().socket().bind(localAddress);
204         }
205     }
206 
207     @Override
208     protected boolean doConnect(SocketAddress remoteAddress,
209             SocketAddress localAddress) throws Exception {
210         if (localAddress != null) {
211             doBind0(localAddress);
212         }
213 
214         boolean success = false;
215         try {
216             javaChannel().connect(remoteAddress);
217             success = true;
218             return true;
219         } finally {
220             if (!success) {
221                 doClose();
222             }
223         }
224     }
225 
226     @Override
227     protected void doFinishConnect() throws Exception {
228         throw new Error();
229     }
230 
231     @Override
232     protected void doDisconnect() throws Exception {
233         javaChannel().disconnect();
234     }
235 
236     @Override
237     protected void doClose() throws Exception {
238         javaChannel().close();
239     }
240 
241     @Override
242     protected int doReadMessages(List<Object> buf) throws Exception {
243         DatagramChannel ch = javaChannel();
244         DatagramChannelConfig config = config();
245         RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
246 
247         ByteBuf data = allocHandle.allocate(config.getAllocator());
248         allocHandle.attemptedBytesRead(data.writableBytes());
249         boolean free = true;
250         try {
251             ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
252             int pos = nioData.position();
253             InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
254             if (remoteAddress == null) {
255                 return 0;
256             }
257 
258             allocHandle.lastBytesRead(nioData.position() - pos);
259             buf.add(new DatagramPacket(data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()),
260                     localAddress(), remoteAddress));
261             free = false;
262             return 1;
263         } catch (Throwable cause) {
264             PlatformDependent.throwException(cause);
265             return -1;
266         }  finally {
267             if (free) {
268                 data.release();
269             }
270         }
271     }
272 
273     @Override
274     protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
275         final SocketAddress remoteAddress;
276         final ByteBuf data;
277         if (msg instanceof AddressedEnvelope) {
278             @SuppressWarnings("unchecked")
279             AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
280             remoteAddress = envelope.recipient();
281             data = envelope.content();
282         } else {
283             data = (ByteBuf) msg;
284             remoteAddress = null;
285         }
286 
287         final int dataLen = data.readableBytes();
288         if (dataLen == 0) {
289             return true;
290         }
291 
292         final ByteBuffer nioData = data.nioBufferCount() == 1 ? data.internalNioBuffer(data.readerIndex(), dataLen)
293                                                               : data.nioBuffer(data.readerIndex(), dataLen);
294         final int writtenBytes;
295         if (remoteAddress != null) {
296             writtenBytes = javaChannel().send(nioData, remoteAddress);
297         } else {
298             writtenBytes = javaChannel().write(nioData);
299         }
300         return writtenBytes > 0;
301     }
302 
303     @Override
304     protected Object filterOutboundMessage(Object msg) {
305         if (msg instanceof DatagramPacket) {
306             DatagramPacket p = (DatagramPacket) msg;
307             ByteBuf content = p.content();
308             if (isSingleDirectBuffer(content)) {
309                 return p;
310             }
311             return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
312         }
313 
314         if (msg instanceof ByteBuf) {
315             ByteBuf buf = (ByteBuf) msg;
316             if (isSingleDirectBuffer(buf)) {
317                 return buf;
318             }
319             return newDirectBuffer(buf);
320         }
321 
322         if (msg instanceof AddressedEnvelope) {
323             @SuppressWarnings("unchecked")
324             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
325             if (e.content() instanceof ByteBuf) {
326                 ByteBuf content = (ByteBuf) e.content();
327                 if (isSingleDirectBuffer(content)) {
328                     return e;
329                 }
330                 return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
331             }
332         }
333 
334         throw new UnsupportedOperationException(
335                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
336     }
337 
338     /**
339      * Checks if the specified buffer is a direct buffer and is composed of a single NIO buffer.
340      * (We check this because otherwise we need to make it a non-composite buffer.)
341      */
342     private static boolean isSingleDirectBuffer(ByteBuf buf) {
343         return buf.isDirect() && buf.nioBufferCount() == 1;
344     }
345 
346     @Override
347     protected boolean continueOnWriteError() {
348         // Continue on write error as a DatagramChannel can write to multiple remote peers
349         //
350         // See https://github.com/netty/netty/issues/2665
351         return true;
352     }
353 
354     @Override
355     public InetSocketAddress localAddress() {
356         return (InetSocketAddress) super.localAddress();
357     }
358 
359     @Override
360     public InetSocketAddress remoteAddress() {
361         return (InetSocketAddress) super.remoteAddress();
362     }
363 
364     @Override
365     public ChannelFuture joinGroup(InetAddress multicastAddress) {
366         return joinGroup(multicastAddress, newPromise());
367     }
368 
369     @Override
370     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
371         try {
372             NetworkInterface iface = config.getNetworkInterface();
373             if (iface == null) {
374                 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
375             }
376             return joinGroup(
377                     multicastAddress, iface, null, promise);
378         } catch (SocketException e) {
379             promise.setFailure(e);
380         }
381         return promise;
382     }
383 
384     @Override
385     public ChannelFuture joinGroup(
386             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
387         return joinGroup(multicastAddress, networkInterface, newPromise());
388     }
389 
390     @Override
391     public ChannelFuture joinGroup(
392             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
393             ChannelPromise promise) {
394         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
395     }
396 
397     @Override
398     public ChannelFuture joinGroup(
399             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
400         return joinGroup(multicastAddress, networkInterface, source, newPromise());
401     }
402 
403     @SuppressJava6Requirement(reason = "Usage guarded by java version check")
404     @Override
405     public ChannelFuture joinGroup(
406             InetAddress multicastAddress, NetworkInterface networkInterface,
407             InetAddress source, ChannelPromise promise) {
408 
409         checkJavaVersion();
410 
411         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
412         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
413 
414         try {
415             MembershipKey key;
416             if (source == null) {
417                 key = javaChannel().join(multicastAddress, networkInterface);
418             } else {
419                 key = javaChannel().join(multicastAddress, networkInterface, source);
420             }
421 
422             synchronized (this) {
423                 List<MembershipKey> keys = null;
424                 if (memberships == null) {
425                     memberships = new HashMap<InetAddress, List<MembershipKey>>();
426                 } else {
427                     keys = memberships.get(multicastAddress);
428                 }
429                 if (keys == null) {
430                     keys = new ArrayList<MembershipKey>();
431                     memberships.put(multicastAddress, keys);
432                 }
433                 keys.add(key);
434             }
435 
436             promise.setSuccess();
437         } catch (Throwable e) {
438             promise.setFailure(e);
439         }
440 
441         return promise;
442     }
443 
444     @Override
445     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
446         return leaveGroup(multicastAddress, newPromise());
447     }
448 
449     @Override
450     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
451         try {
452             return leaveGroup(
453                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
454         } catch (SocketException e) {
455             promise.setFailure(e);
456         }
457         return promise;
458     }
459 
460     @Override
461     public ChannelFuture leaveGroup(
462             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
463         return leaveGroup(multicastAddress, networkInterface, newPromise());
464     }
465 
466     @Override
467     public ChannelFuture leaveGroup(
468             InetSocketAddress multicastAddress,
469             NetworkInterface networkInterface, ChannelPromise promise) {
470         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
471     }
472 
473     @Override
474     public ChannelFuture leaveGroup(
475             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
476         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
477     }
478 
479     @SuppressJava6Requirement(reason = "Usage guarded by java version check")
480     @Override
481     public ChannelFuture leaveGroup(
482             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
483             ChannelPromise promise) {
484         checkJavaVersion();
485 
486         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
487         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
488 
489         synchronized (this) {
490             if (memberships != null) {
491                 List<MembershipKey> keys = memberships.get(multicastAddress);
492                 if (keys != null) {
493                     Iterator<MembershipKey> keyIt = keys.iterator();
494 
495                     while (keyIt.hasNext()) {
496                         MembershipKey key = keyIt.next();
497                         if (networkInterface.equals(key.networkInterface())) {
498                            if (source == null && key.sourceAddress() == null ||
499                                source != null && source.equals(key.sourceAddress())) {
500                                key.drop();
501                                keyIt.remove();
502                            }
503                         }
504                     }
505                     if (keys.isEmpty()) {
506                         memberships.remove(multicastAddress);
507                     }
508                 }
509             }
510         }
511 
512         promise.setSuccess();
513         return promise;
514     }
515 
516     /**
517      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
518      */
519     @Override
520     public ChannelFuture block(
521             InetAddress multicastAddress, NetworkInterface networkInterface,
522             InetAddress sourceToBlock) {
523         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
524     }
525 
526     /**
527      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
528      */
529     @SuppressJava6Requirement(reason = "Usage guarded by java version check")
530     @Override
531     public ChannelFuture block(
532             InetAddress multicastAddress, NetworkInterface networkInterface,
533             InetAddress sourceToBlock, ChannelPromise promise) {
534         checkJavaVersion();
535 
536         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
537         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
538         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
539 
540         synchronized (this) {
541             if (memberships != null) {
542                 List<MembershipKey> keys = memberships.get(multicastAddress);
543                 for (MembershipKey key: keys) {
544                     if (networkInterface.equals(key.networkInterface())) {
545                         try {
546                             key.block(sourceToBlock);
547                         } catch (IOException e) {
548                             promise.setFailure(e);
549                         }
550                     }
551                 }
552             }
553         }
554         promise.setSuccess();
555         return promise;
556     }
557 
558     /**
559      * Block the given sourceToBlock address for the given multicastAddress
560      *
561      */
562     @Override
563     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
564         return block(multicastAddress, sourceToBlock, newPromise());
565     }
566 
567     /**
568      * Block the given sourceToBlock address for the given multicastAddress
569      *
570      */
571     @Override
572     public ChannelFuture block(
573             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
574         try {
575             return block(
576                     multicastAddress,
577                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
578                     sourceToBlock, promise);
579         } catch (SocketException e) {
580             promise.setFailure(e);
581         }
582         return promise;
583     }
584 
585     @Override
586     @Deprecated
587     protected void setReadPending(boolean readPending) {
588         super.setReadPending(readPending);
589     }
590 
591     void clearReadPending0() {
592         clearReadPending();
593     }
594 
595     @Override
596     protected boolean closeOnReadError(Throwable cause) {
597         // We do not want to close on SocketException when using DatagramChannel as we usually can continue receiving.
598         // See https://github.com/netty/netty/issues/5893
599         if (cause instanceof SocketException) {
600             return false;
601         }
602         return super.closeOnReadError(cause);
603     }
604 
605     @Override
606     protected boolean continueReading(RecvByteBufAllocator.Handle allocHandle) {
607         if (allocHandle instanceof RecvByteBufAllocator.ExtendedHandle) {
608             // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
609             // as we read anything).
610             return ((RecvByteBufAllocator.ExtendedHandle) allocHandle)
611                     .continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER);
612         }
613         return allocHandle.continueReading();
614     }
615 }