View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.socket.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelException;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOption;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.channel.DefaultAddressedEnvelope;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.nio.AbstractNioMessageChannel;
30  import io.netty.channel.socket.DatagramChannelConfig;
31  import io.netty.channel.socket.DatagramPacket;
32  import io.netty.channel.socket.InternetProtocolFamily;
33  import io.netty.util.internal.SocketUtils;
34  import io.netty.util.internal.PlatformDependent;
35  import io.netty.util.internal.StringUtil;
36  import io.netty.util.internal.UnstableApi;
37  
38  import java.io.IOException;
39  import java.net.InetAddress;
40  import java.net.InetSocketAddress;
41  import java.net.NetworkInterface;
42  import java.net.SocketAddress;
43  import java.net.SocketException;
44  import java.nio.ByteBuffer;
45  import java.nio.channels.DatagramChannel;
46  import java.nio.channels.MembershipKey;
47  import java.nio.channels.SelectionKey;
48  import java.nio.channels.spi.SelectorProvider;
49  import java.util.ArrayList;
50  import java.util.HashMap;
51  import java.util.Iterator;
52  import java.util.List;
53  import java.util.Map;
54  
55  /**
56   * An NIO datagram {@link Channel} that sends and receives an
57   * {@link AddressedEnvelope AddressedEnvelope<ByteBuf, SocketAddress>}.
58   *
59   * @see AddressedEnvelope
60   * @see DatagramPacket
61   */
62  public final class NioDatagramChannel
63          extends AbstractNioMessageChannel implements io.netty.channel.socket.DatagramChannel {
64  
65      private static final ChannelMetadata METADATA = new ChannelMetadata(true);
66      private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
67      private static final String EXPECTED_TYPES =
68              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
69              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
70              StringUtil.simpleClassName(ByteBuf.class) + ", " +
71              StringUtil.simpleClassName(SocketAddress.class) + ">, " +
72              StringUtil.simpleClassName(ByteBuf.class) + ')';
73  
74      private final DatagramChannelConfig config;
75  
76      private Map<InetAddress, List<MembershipKey>> memberships;
77  
78      private static DatagramChannel newSocket(SelectorProvider provider) {
79          try {
80              /**
81               *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
82               *  {@link SelectorProvider#provider()} which is called by each DatagramChannel.open() otherwise.
83               *
84               *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
85               */
86              return provider.openDatagramChannel();
87          } catch (IOException e) {
88              throw new ChannelException("Failed to open a socket.", e);
89          }
90      }
91  
92      private static DatagramChannel newSocket(SelectorProvider provider, InternetProtocolFamily ipFamily) {
93          if (ipFamily == null) {
94              return newSocket(provider);
95          }
96  
97          checkJavaVersion();
98  
99          try {
100             return provider.openDatagramChannel(ProtocolFamilyConverter.convert(ipFamily));
101         } catch (IOException e) {
102             throw new ChannelException("Failed to open a socket.", e);
103         }
104     }
105 
106     private static void checkJavaVersion() {
107         if (PlatformDependent.javaVersion() < 7) {
108             throw new UnsupportedOperationException("Only supported on java 7+.");
109         }
110     }
111 
112     /**
113      * Create a new instance which will use the Operation Systems default {@link InternetProtocolFamily}.
114      */
115     public NioDatagramChannel() {
116         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
117     }
118 
119     /**
120      * Create a new instance using the given {@link SelectorProvider}
121      * which will use the Operation Systems default {@link InternetProtocolFamily}.
122      */
123     public NioDatagramChannel(SelectorProvider provider) {
124         this(newSocket(provider));
125     }
126 
127     /**
128      * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
129      * on the Operation Systems default which will be chosen.
130      */
131     public NioDatagramChannel(InternetProtocolFamily ipFamily) {
132         this(newSocket(DEFAULT_SELECTOR_PROVIDER, ipFamily));
133     }
134 
135     /**
136      * Create a new instance using the given {@link SelectorProvider} and {@link InternetProtocolFamily}.
137      * If {@link InternetProtocolFamily} is {@code null} it will depend on the Operation Systems default
138      * which will be chosen.
139      */
140     public NioDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) {
141         this(newSocket(provider, ipFamily));
142     }
143 
144     /**
145      * Create a new instance from the given {@link DatagramChannel}.
146      */
147     public NioDatagramChannel(DatagramChannel socket) {
148         super(null, socket, SelectionKey.OP_READ);
149         config = new NioDatagramChannelConfig(this, socket);
150     }
151 
152     @Override
153     public ChannelMetadata metadata() {
154         return METADATA;
155     }
156 
157     @Override
158     public DatagramChannelConfig config() {
159         return config;
160     }
161 
162     @Override
163     @SuppressWarnings("deprecation")
164     public boolean isActive() {
165         DatagramChannel ch = javaChannel();
166         return ch.isOpen() && (
167                 config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
168                 || ch.socket().isBound());
169     }
170 
171     @Override
172     public boolean isConnected() {
173         return javaChannel().isConnected();
174     }
175 
176     @Override
177     protected DatagramChannel javaChannel() {
178         return (DatagramChannel) super.javaChannel();
179     }
180 
181     @Override
182     protected SocketAddress localAddress0() {
183         return javaChannel().socket().getLocalSocketAddress();
184     }
185 
186     @Override
187     protected SocketAddress remoteAddress0() {
188         return javaChannel().socket().getRemoteSocketAddress();
189     }
190 
191     @Override
192     protected void doBind(SocketAddress localAddress) throws Exception {
193         doBind0(localAddress);
194     }
195 
196     private void doBind0(SocketAddress localAddress) throws Exception {
197         if (PlatformDependent.javaVersion() >= 7) {
198             SocketUtils.bind(javaChannel(), localAddress);
199         } else {
200             javaChannel().socket().bind(localAddress);
201         }
202     }
203 
204     @Override
205     protected boolean doConnect(SocketAddress remoteAddress,
206             SocketAddress localAddress) throws Exception {
207         if (localAddress != null) {
208             doBind0(localAddress);
209         }
210 
211         boolean success = false;
212         try {
213             javaChannel().connect(remoteAddress);
214             success = true;
215             return true;
216         } finally {
217             if (!success) {
218                 doClose();
219             }
220         }
221     }
222 
223     @Override
224     protected void doFinishConnect() throws Exception {
225         throw new Error();
226     }
227 
228     @Override
229     protected void doDisconnect() throws Exception {
230         javaChannel().disconnect();
231     }
232 
233     @Override
234     protected void doClose() throws Exception {
235         javaChannel().close();
236     }
237 
238     @Override
239     protected int doReadMessages(List<Object> buf) throws Exception {
240         DatagramChannel ch = javaChannel();
241         DatagramChannelConfig config = config();
242         RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
243 
244         ByteBuf data = allocHandle.allocate(config.getAllocator());
245         allocHandle.attemptedBytesRead(data.writableBytes());
246         boolean free = true;
247         try {
248             ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
249             int pos = nioData.position();
250             InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
251             if (remoteAddress == null) {
252                 return 0;
253             }
254 
255             allocHandle.lastBytesRead(nioData.position() - pos);
256             buf.add(new DatagramPacket(data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()),
257                     localAddress(), remoteAddress));
258             free = false;
259             return 1;
260         } catch (Throwable cause) {
261             PlatformDependent.throwException(cause);
262             return -1;
263         }  finally {
264             if (free) {
265                 data.release();
266             }
267         }
268     }
269 
270     @Override
271     protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
272         final SocketAddress remoteAddress;
273         final ByteBuf data;
274         if (msg instanceof AddressedEnvelope) {
275             @SuppressWarnings("unchecked")
276             AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
277             remoteAddress = envelope.recipient();
278             data = envelope.content();
279         } else {
280             data = (ByteBuf) msg;
281             remoteAddress = null;
282         }
283 
284         final int dataLen = data.readableBytes();
285         if (dataLen == 0) {
286             return true;
287         }
288 
289         final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen);
290         final int writtenBytes;
291         if (remoteAddress != null) {
292             writtenBytes = javaChannel().send(nioData, remoteAddress);
293         } else {
294             writtenBytes = javaChannel().write(nioData);
295         }
296         return writtenBytes > 0;
297     }
298 
299     @Override
300     protected Object filterOutboundMessage(Object msg) {
301         if (msg instanceof DatagramPacket) {
302             DatagramPacket p = (DatagramPacket) msg;
303             ByteBuf content = p.content();
304             if (isSingleDirectBuffer(content)) {
305                 return p;
306             }
307             return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
308         }
309 
310         if (msg instanceof ByteBuf) {
311             ByteBuf buf = (ByteBuf) msg;
312             if (isSingleDirectBuffer(buf)) {
313                 return buf;
314             }
315             return newDirectBuffer(buf);
316         }
317 
318         if (msg instanceof AddressedEnvelope) {
319             @SuppressWarnings("unchecked")
320             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
321             if (e.content() instanceof ByteBuf) {
322                 ByteBuf content = (ByteBuf) e.content();
323                 if (isSingleDirectBuffer(content)) {
324                     return e;
325                 }
326                 return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
327             }
328         }
329 
330         throw new UnsupportedOperationException(
331                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
332     }
333 
334     /**
335      * Checks if the specified buffer is a direct buffer and is composed of a single NIO buffer.
336      * (We check this because otherwise we need to make it a non-composite buffer.)
337      */
338     private static boolean isSingleDirectBuffer(ByteBuf buf) {
339         return buf.isDirect() && buf.nioBufferCount() == 1;
340     }
341 
342     @Override
343     protected boolean continueOnWriteError() {
344         // Continue on write error as a DatagramChannel can write to multiple remote peers
345         //
346         // See https://github.com/netty/netty/issues/2665
347         return true;
348     }
349 
350     @Override
351     public InetSocketAddress localAddress() {
352         return (InetSocketAddress) super.localAddress();
353     }
354 
355     @Override
356     public InetSocketAddress remoteAddress() {
357         return (InetSocketAddress) super.remoteAddress();
358     }
359 
360     @Override
361     public ChannelFuture joinGroup(InetAddress multicastAddress) {
362         return joinGroup(multicastAddress, newPromise());
363     }
364 
365     @Override
366     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
367         try {
368             return joinGroup(
369                     multicastAddress,
370                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
371                     null, promise);
372         } catch (SocketException e) {
373             promise.setFailure(e);
374         }
375         return promise;
376     }
377 
378     @Override
379     public ChannelFuture joinGroup(
380             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
381         return joinGroup(multicastAddress, networkInterface, newPromise());
382     }
383 
384     @Override
385     public ChannelFuture joinGroup(
386             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
387             ChannelPromise promise) {
388         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
389     }
390 
391     @Override
392     public ChannelFuture joinGroup(
393             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
394         return joinGroup(multicastAddress, networkInterface, source, newPromise());
395     }
396 
397     @Override
398     public ChannelFuture joinGroup(
399             InetAddress multicastAddress, NetworkInterface networkInterface,
400             InetAddress source, ChannelPromise promise) {
401 
402         checkJavaVersion();
403 
404         if (multicastAddress == null) {
405             throw new NullPointerException("multicastAddress");
406         }
407 
408         if (networkInterface == null) {
409             throw new NullPointerException("networkInterface");
410         }
411 
412         try {
413             MembershipKey key;
414             if (source == null) {
415                 key = javaChannel().join(multicastAddress, networkInterface);
416             } else {
417                 key = javaChannel().join(multicastAddress, networkInterface, source);
418             }
419 
420             synchronized (this) {
421                 List<MembershipKey> keys = null;
422                 if (memberships == null) {
423                     memberships = new HashMap<InetAddress, List<MembershipKey>>();
424                 } else {
425                     keys = memberships.get(multicastAddress);
426                 }
427                 if (keys == null) {
428                     keys = new ArrayList<MembershipKey>();
429                     memberships.put(multicastAddress, keys);
430                 }
431                 keys.add(key);
432             }
433 
434             promise.setSuccess();
435         } catch (Throwable e) {
436             promise.setFailure(e);
437         }
438 
439         return promise;
440     }
441 
442     @Override
443     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
444         return leaveGroup(multicastAddress, newPromise());
445     }
446 
447     @Override
448     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
449         try {
450             return leaveGroup(
451                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
452         } catch (SocketException e) {
453             promise.setFailure(e);
454         }
455         return promise;
456     }
457 
458     @Override
459     public ChannelFuture leaveGroup(
460             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
461         return leaveGroup(multicastAddress, networkInterface, newPromise());
462     }
463 
464     @Override
465     public ChannelFuture leaveGroup(
466             InetSocketAddress multicastAddress,
467             NetworkInterface networkInterface, ChannelPromise promise) {
468         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
469     }
470 
471     @Override
472     public ChannelFuture leaveGroup(
473             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
474         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
475     }
476 
477     @Override
478     public ChannelFuture leaveGroup(
479             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
480             ChannelPromise promise) {
481         checkJavaVersion();
482 
483         if (multicastAddress == null) {
484             throw new NullPointerException("multicastAddress");
485         }
486         if (networkInterface == null) {
487             throw new NullPointerException("networkInterface");
488         }
489 
490         synchronized (this) {
491             if (memberships != null) {
492                 List<MembershipKey> keys = memberships.get(multicastAddress);
493                 if (keys != null) {
494                     Iterator<MembershipKey> keyIt = keys.iterator();
495 
496                     while (keyIt.hasNext()) {
497                         MembershipKey key = keyIt.next();
498                         if (networkInterface.equals(key.networkInterface())) {
499                            if (source == null && key.sourceAddress() == null ||
500                                source != null && source.equals(key.sourceAddress())) {
501                                key.drop();
502                                keyIt.remove();
503                            }
504                         }
505                     }
506                     if (keys.isEmpty()) {
507                         memberships.remove(multicastAddress);
508                     }
509                 }
510             }
511         }
512 
513         promise.setSuccess();
514         return promise;
515     }
516 
517     /**
518      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
519      */
520     @Override
521     public ChannelFuture block(
522             InetAddress multicastAddress, NetworkInterface networkInterface,
523             InetAddress sourceToBlock) {
524         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
525     }
526 
527     /**
528      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
529      */
530     @Override
531     public ChannelFuture block(
532             InetAddress multicastAddress, NetworkInterface networkInterface,
533             InetAddress sourceToBlock, ChannelPromise promise) {
534         checkJavaVersion();
535 
536         if (multicastAddress == null) {
537             throw new NullPointerException("multicastAddress");
538         }
539         if (sourceToBlock == null) {
540             throw new NullPointerException("sourceToBlock");
541         }
542 
543         if (networkInterface == null) {
544             throw new NullPointerException("networkInterface");
545         }
546         synchronized (this) {
547             if (memberships != null) {
548                 List<MembershipKey> keys = memberships.get(multicastAddress);
549                 for (MembershipKey key: keys) {
550                     if (networkInterface.equals(key.networkInterface())) {
551                         try {
552                             key.block(sourceToBlock);
553                         } catch (IOException e) {
554                             promise.setFailure(e);
555                         }
556                     }
557                 }
558             }
559         }
560         promise.setSuccess();
561         return promise;
562     }
563 
564     /**
565      * Block the given sourceToBlock address for the given multicastAddress
566      *
567      */
568     @Override
569     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
570         return block(multicastAddress, sourceToBlock, newPromise());
571     }
572 
573     /**
574      * Block the given sourceToBlock address for the given multicastAddress
575      *
576      */
577     @Override
578     public ChannelFuture block(
579             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
580         try {
581             return block(
582                     multicastAddress,
583                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
584                     sourceToBlock, promise);
585         } catch (SocketException e) {
586             promise.setFailure(e);
587         }
588         return promise;
589     }
590 
591     @Override
592     @Deprecated
593     protected void setReadPending(boolean readPending) {
594         super.setReadPending(readPending);
595     }
596 
597     void clearReadPending0() {
598         clearReadPending();
599     }
600 
601     @Override
602     protected boolean closeOnReadError(Throwable cause) {
603         // We do not want to close on SocketException when using DatagramChannel as we usually can continue receiving.
604         // See https://github.com/netty/netty/issues/5893
605         if (cause instanceof SocketException) {
606             return false;
607         }
608         return super.closeOnReadError(cause);
609     }
610 }