View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.socket.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelException;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOption;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.channel.DefaultAddressedEnvelope;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.nio.AbstractNioMessageChannel;
30  import io.netty.channel.socket.DatagramChannelConfig;
31  import io.netty.channel.socket.DatagramPacket;
32  import io.netty.channel.socket.InternetProtocolFamily;
33  import io.netty.util.UncheckedBooleanSupplier;
34  import io.netty.util.internal.ObjectUtil;
35  import io.netty.util.internal.SocketUtils;
36  import io.netty.util.internal.PlatformDependent;
37  import io.netty.util.internal.StringUtil;
38  import io.netty.util.internal.SuppressJava6Requirement;
39  
40  import java.io.IOException;
41  import java.net.InetAddress;
42  import java.net.InetSocketAddress;
43  import java.net.NetworkInterface;
44  import java.net.SocketAddress;
45  import java.net.SocketException;
46  import java.nio.ByteBuffer;
47  import java.nio.channels.DatagramChannel;
48  import java.nio.channels.MembershipKey;
49  import java.nio.channels.SelectionKey;
50  import java.nio.channels.UnresolvedAddressException;
51  import java.nio.channels.spi.SelectorProvider;
52  import java.util.ArrayList;
53  import java.util.HashMap;
54  import java.util.Iterator;
55  import java.util.List;
56  import java.util.Map;
57  
58  /**
59   * An NIO datagram {@link Channel} that sends and receives an
60   * {@link AddressedEnvelope AddressedEnvelope<ByteBuf, SocketAddress>}.
61   *
62   * @see AddressedEnvelope
63   * @see DatagramPacket
64   */
65  public final class NioDatagramChannel
66          extends AbstractNioMessageChannel implements io.netty.channel.socket.DatagramChannel {
67  
68      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
69      private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
70      private static final String EXPECTED_TYPES =
71              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
72              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
73              StringUtil.simpleClassName(ByteBuf.class) + ", " +
74              StringUtil.simpleClassName(SocketAddress.class) + ">, " +
75              StringUtil.simpleClassName(ByteBuf.class) + ')';
76  
77      private final DatagramChannelConfig config;
78  
79      private Map<InetAddress, List<MembershipKey>> memberships;
80  
81      private static DatagramChannel newSocket(SelectorProvider provider) {
82          try {
83              /**
84               *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
85               *  {@link SelectorProvider#provider()} which is called by each DatagramChannel.open() otherwise.
86               *
87               *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
88               */
89              return provider.openDatagramChannel();
90          } catch (IOException e) {
91              throw new ChannelException("Failed to open a socket.", e);
92          }
93      }
94  
95      @SuppressJava6Requirement(reason = "Usage guarded by java version check")
96      private static DatagramChannel newSocket(SelectorProvider provider, InternetProtocolFamily ipFamily) {
97          if (ipFamily == null) {
98              return newSocket(provider);
99          }
100 
101         checkJavaVersion();
102 
103         try {
104             return provider.openDatagramChannel(ProtocolFamilyConverter.convert(ipFamily));
105         } catch (IOException e) {
106             throw new ChannelException("Failed to open a socket.", e);
107         }
108     }
109 
110     private static void checkJavaVersion() {
111         if (PlatformDependent.javaVersion() < 7) {
112             throw new UnsupportedOperationException("Only supported on java 7+.");
113         }
114     }
115 
116     /**
117      * Create a new instance which will use the Operation Systems default {@link InternetProtocolFamily}.
118      */
119     public NioDatagramChannel() {
120         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
121     }
122 
123     /**
124      * Create a new instance using the given {@link SelectorProvider}
125      * which will use the Operation Systems default {@link InternetProtocolFamily}.
126      */
127     public NioDatagramChannel(SelectorProvider provider) {
128         this(newSocket(provider));
129     }
130 
131     /**
132      * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
133      * on the Operation Systems default which will be chosen.
134      */
135     public NioDatagramChannel(InternetProtocolFamily ipFamily) {
136         this(newSocket(DEFAULT_SELECTOR_PROVIDER, ipFamily));
137     }
138 
139     /**
140      * Create a new instance using the given {@link SelectorProvider} and {@link InternetProtocolFamily}.
141      * If {@link InternetProtocolFamily} is {@code null} it will depend on the Operation Systems default
142      * which will be chosen.
143      */
144     public NioDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) {
145         this(newSocket(provider, ipFamily));
146     }
147 
148     /**
149      * Create a new instance from the given {@link DatagramChannel}.
150      */
151     public NioDatagramChannel(DatagramChannel socket) {
152         super(null, socket, SelectionKey.OP_READ);
153         config = new NioDatagramChannelConfig(this, socket);
154     }
155 
156     @Override
157     public ChannelMetadata metadata() {
158         return METADATA;
159     }
160 
161     @Override
162     public DatagramChannelConfig config() {
163         return config;
164     }
165 
166     @Override
167     @SuppressWarnings("deprecation")
168     public boolean isActive() {
169         DatagramChannel ch = javaChannel();
170         return ch.isOpen() && (
171                 config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
172                 || ch.socket().isBound());
173     }
174 
175     @Override
176     public boolean isConnected() {
177         return javaChannel().isConnected();
178     }
179 
180     @Override
181     protected DatagramChannel javaChannel() {
182         return (DatagramChannel) super.javaChannel();
183     }
184 
185     @Override
186     protected SocketAddress localAddress0() {
187         return javaChannel().socket().getLocalSocketAddress();
188     }
189 
190     @Override
191     protected SocketAddress remoteAddress0() {
192         return javaChannel().socket().getRemoteSocketAddress();
193     }
194 
195     @Override
196     protected void doBind(SocketAddress localAddress) throws Exception {
197         doBind0(localAddress);
198     }
199 
200     private void doBind0(SocketAddress localAddress) throws Exception {
201         if (PlatformDependent.javaVersion() >= 7) {
202             SocketUtils.bind(javaChannel(), localAddress);
203         } else {
204             javaChannel().socket().bind(localAddress);
205         }
206     }
207 
208     @Override
209     protected boolean doConnect(SocketAddress remoteAddress,
210             SocketAddress localAddress) throws Exception {
211         if (localAddress != null) {
212             doBind0(localAddress);
213         }
214 
215         boolean success = false;
216         try {
217             javaChannel().connect(remoteAddress);
218             success = true;
219             return true;
220         } finally {
221             if (!success) {
222                 doClose();
223             }
224         }
225     }
226 
227     @Override
228     protected void doFinishConnect() throws Exception {
229         throw new Error();
230     }
231 
232     @Override
233     protected void doDisconnect() throws Exception {
234         javaChannel().disconnect();
235     }
236 
237     @Override
238     protected void doClose() throws Exception {
239         javaChannel().close();
240     }
241 
242     @Override
243     protected int doReadMessages(List<Object> buf) throws Exception {
244         DatagramChannel ch = javaChannel();
245         DatagramChannelConfig config = config();
246         RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
247 
248         ByteBuf data = allocHandle.allocate(config.getAllocator());
249         allocHandle.attemptedBytesRead(data.writableBytes());
250         boolean free = true;
251         try {
252             ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
253             int pos = nioData.position();
254             InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
255             if (remoteAddress == null) {
256                 return 0;
257             }
258 
259             allocHandle.lastBytesRead(nioData.position() - pos);
260             buf.add(new DatagramPacket(data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()),
261                     localAddress(), remoteAddress));
262             free = false;
263             return 1;
264         } catch (Throwable cause) {
265             PlatformDependent.throwException(cause);
266             return -1;
267         }  finally {
268             if (free) {
269                 data.release();
270             }
271         }
272     }
273 
274     @Override
275     protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
276         final SocketAddress remoteAddress;
277         final ByteBuf data;
278         if (msg instanceof AddressedEnvelope) {
279             @SuppressWarnings("unchecked")
280             AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
281             remoteAddress = envelope.recipient();
282             data = envelope.content();
283         } else {
284             data = (ByteBuf) msg;
285             remoteAddress = null;
286         }
287 
288         final int dataLen = data.readableBytes();
289         if (dataLen == 0) {
290             return true;
291         }
292 
293         final ByteBuffer nioData = data.nioBufferCount() == 1 ? data.internalNioBuffer(data.readerIndex(), dataLen)
294                                                               : data.nioBuffer(data.readerIndex(), dataLen);
295         final int writtenBytes;
296         if (remoteAddress != null) {
297             writtenBytes = javaChannel().send(nioData, remoteAddress);
298         } else {
299             writtenBytes = javaChannel().write(nioData);
300         }
301         return writtenBytes > 0;
302     }
303 
304     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
305         if (envelope.recipient() instanceof InetSocketAddress
306                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
307             throw new UnresolvedAddressException();
308         }
309     }
310 
311     @Override
312     protected Object filterOutboundMessage(Object msg) {
313         if (msg instanceof DatagramPacket) {
314             DatagramPacket p = (DatagramPacket) msg;
315             checkUnresolved(p);
316             ByteBuf content = p.content();
317             if (isSingleDirectBuffer(content)) {
318                 return p;
319             }
320             return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
321         }
322 
323         if (msg instanceof ByteBuf) {
324             ByteBuf buf = (ByteBuf) msg;
325             if (isSingleDirectBuffer(buf)) {
326                 return buf;
327             }
328             return newDirectBuffer(buf);
329         }
330 
331         if (msg instanceof AddressedEnvelope) {
332             @SuppressWarnings("unchecked")
333             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
334             checkUnresolved(e);
335             if (e.content() instanceof ByteBuf) {
336                 ByteBuf content = (ByteBuf) e.content();
337                 if (isSingleDirectBuffer(content)) {
338                     return e;
339                 }
340                 return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
341             }
342         }
343 
344         throw new UnsupportedOperationException(
345                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
346     }
347 
348     /**
349      * Checks if the specified buffer is a direct buffer and is composed of a single NIO buffer.
350      * (We check this because otherwise we need to make it a non-composite buffer.)
351      */
352     private static boolean isSingleDirectBuffer(ByteBuf buf) {
353         return buf.isDirect() && buf.nioBufferCount() == 1;
354     }
355 
356     @Override
357     protected boolean continueOnWriteError() {
358         // Continue on write error as a DatagramChannel can write to multiple remote peers
359         //
360         // See https://github.com/netty/netty/issues/2665
361         return true;
362     }
363 
364     @Override
365     public InetSocketAddress localAddress() {
366         return (InetSocketAddress) super.localAddress();
367     }
368 
369     @Override
370     public InetSocketAddress remoteAddress() {
371         return (InetSocketAddress) super.remoteAddress();
372     }
373 
374     @Override
375     public ChannelFuture joinGroup(InetAddress multicastAddress) {
376         return joinGroup(multicastAddress, newPromise());
377     }
378 
379     @Override
380     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
381         try {
382             NetworkInterface iface = config.getNetworkInterface();
383             if (iface == null) {
384                 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
385             }
386             return joinGroup(
387                     multicastAddress, iface, null, promise);
388         } catch (SocketException e) {
389             promise.setFailure(e);
390         }
391         return promise;
392     }
393 
394     @Override
395     public ChannelFuture joinGroup(
396             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
397         return joinGroup(multicastAddress, networkInterface, newPromise());
398     }
399 
400     @Override
401     public ChannelFuture joinGroup(
402             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
403             ChannelPromise promise) {
404         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
405     }
406 
407     @Override
408     public ChannelFuture joinGroup(
409             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
410         return joinGroup(multicastAddress, networkInterface, source, newPromise());
411     }
412 
413     @SuppressJava6Requirement(reason = "Usage guarded by java version check")
414     @Override
415     public ChannelFuture joinGroup(
416             InetAddress multicastAddress, NetworkInterface networkInterface,
417             InetAddress source, ChannelPromise promise) {
418 
419         checkJavaVersion();
420 
421         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
422         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
423 
424         try {
425             MembershipKey key;
426             if (source == null) {
427                 key = javaChannel().join(multicastAddress, networkInterface);
428             } else {
429                 key = javaChannel().join(multicastAddress, networkInterface, source);
430             }
431 
432             synchronized (this) {
433                 List<MembershipKey> keys = null;
434                 if (memberships == null) {
435                     memberships = new HashMap<InetAddress, List<MembershipKey>>();
436                 } else {
437                     keys = memberships.get(multicastAddress);
438                 }
439                 if (keys == null) {
440                     keys = new ArrayList<MembershipKey>();
441                     memberships.put(multicastAddress, keys);
442                 }
443                 keys.add(key);
444             }
445 
446             promise.setSuccess();
447         } catch (Throwable e) {
448             promise.setFailure(e);
449         }
450 
451         return promise;
452     }
453 
454     @Override
455     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
456         return leaveGroup(multicastAddress, newPromise());
457     }
458 
459     @Override
460     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
461         try {
462             return leaveGroup(
463                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
464         } catch (SocketException e) {
465             promise.setFailure(e);
466         }
467         return promise;
468     }
469 
470     @Override
471     public ChannelFuture leaveGroup(
472             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
473         return leaveGroup(multicastAddress, networkInterface, newPromise());
474     }
475 
476     @Override
477     public ChannelFuture leaveGroup(
478             InetSocketAddress multicastAddress,
479             NetworkInterface networkInterface, ChannelPromise promise) {
480         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
481     }
482 
483     @Override
484     public ChannelFuture leaveGroup(
485             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
486         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
487     }
488 
489     @SuppressJava6Requirement(reason = "Usage guarded by java version check")
490     @Override
491     public ChannelFuture leaveGroup(
492             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
493             ChannelPromise promise) {
494         checkJavaVersion();
495 
496         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
497         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
498 
499         synchronized (this) {
500             if (memberships != null) {
501                 List<MembershipKey> keys = memberships.get(multicastAddress);
502                 if (keys != null) {
503                     Iterator<MembershipKey> keyIt = keys.iterator();
504 
505                     while (keyIt.hasNext()) {
506                         MembershipKey key = keyIt.next();
507                         if (networkInterface.equals(key.networkInterface())) {
508                            if (source == null && key.sourceAddress() == null ||
509                                source != null && source.equals(key.sourceAddress())) {
510                                key.drop();
511                                keyIt.remove();
512                            }
513                         }
514                     }
515                     if (keys.isEmpty()) {
516                         memberships.remove(multicastAddress);
517                     }
518                 }
519             }
520         }
521 
522         promise.setSuccess();
523         return promise;
524     }
525 
526     /**
527      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
528      */
529     @Override
530     public ChannelFuture block(
531             InetAddress multicastAddress, NetworkInterface networkInterface,
532             InetAddress sourceToBlock) {
533         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
534     }
535 
536     /**
537      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
538      */
539     @SuppressJava6Requirement(reason = "Usage guarded by java version check")
540     @Override
541     public ChannelFuture block(
542             InetAddress multicastAddress, NetworkInterface networkInterface,
543             InetAddress sourceToBlock, ChannelPromise promise) {
544         checkJavaVersion();
545 
546         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
547         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
548         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
549 
550         synchronized (this) {
551             if (memberships != null) {
552                 List<MembershipKey> keys = memberships.get(multicastAddress);
553                 for (MembershipKey key: keys) {
554                     if (networkInterface.equals(key.networkInterface())) {
555                         try {
556                             key.block(sourceToBlock);
557                         } catch (IOException e) {
558                             promise.setFailure(e);
559                         }
560                     }
561                 }
562             }
563         }
564         promise.setSuccess();
565         return promise;
566     }
567 
568     /**
569      * Block the given sourceToBlock address for the given multicastAddress
570      *
571      */
572     @Override
573     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
574         return block(multicastAddress, sourceToBlock, newPromise());
575     }
576 
577     /**
578      * Block the given sourceToBlock address for the given multicastAddress
579      *
580      */
581     @Override
582     public ChannelFuture block(
583             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
584         try {
585             return block(
586                     multicastAddress,
587                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
588                     sourceToBlock, promise);
589         } catch (SocketException e) {
590             promise.setFailure(e);
591         }
592         return promise;
593     }
594 
595     @Override
596     @Deprecated
597     protected void setReadPending(boolean readPending) {
598         super.setReadPending(readPending);
599     }
600 
601     void clearReadPending0() {
602         clearReadPending();
603     }
604 
605     @Override
606     protected boolean closeOnReadError(Throwable cause) {
607         // We do not want to close on SocketException when using DatagramChannel as we usually can continue receiving.
608         // See https://github.com/netty/netty/issues/5893
609         if (cause instanceof SocketException) {
610             return false;
611         }
612         return super.closeOnReadError(cause);
613     }
614 
615     @Override
616     protected boolean continueReading(RecvByteBufAllocator.Handle allocHandle) {
617         if (allocHandle instanceof RecvByteBufAllocator.ExtendedHandle) {
618             // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
619             // as we read anything).
620             return ((RecvByteBufAllocator.ExtendedHandle) allocHandle)
621                     .continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER);
622         }
623         return allocHandle.continueReading();
624     }
625 }