View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.socket.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelException;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOption;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.channel.DefaultAddressedEnvelope;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.nio.AbstractNioMessageChannel;
30  import io.netty.channel.socket.DatagramChannelConfig;
31  import io.netty.channel.socket.DatagramPacket;
32  import io.netty.channel.socket.InternetProtocolFamily;
33  import io.netty.util.internal.SocketUtils;
34  import io.netty.util.internal.PlatformDependent;
35  import io.netty.util.internal.StringUtil;
36  import io.netty.util.internal.UnstableApi;
37  
38  import java.io.IOException;
39  import java.net.InetAddress;
40  import java.net.InetSocketAddress;
41  import java.net.NetworkInterface;
42  import java.net.SocketAddress;
43  import java.net.SocketException;
44  import java.nio.ByteBuffer;
45  import java.nio.channels.DatagramChannel;
46  import java.nio.channels.MembershipKey;
47  import java.nio.channels.SelectionKey;
48  import java.nio.channels.spi.SelectorProvider;
49  import java.util.ArrayList;
50  import java.util.HashMap;
51  import java.util.Iterator;
52  import java.util.List;
53  import java.util.Map;
54  
55  /**
56   * An NIO datagram {@link Channel} that sends and receives an
57   * {@link AddressedEnvelope AddressedEnvelope<ByteBuf, SocketAddress>}.
58   *
59   * @see AddressedEnvelope
60   * @see DatagramPacket
61   */
62  public final class NioDatagramChannel
63          extends AbstractNioMessageChannel implements io.netty.channel.socket.DatagramChannel {
64  
65      private static final ChannelMetadata METADATA = new ChannelMetadata(true);
66      private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
67      private static final String EXPECTED_TYPES =
68              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
69              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
70              StringUtil.simpleClassName(ByteBuf.class) + ", " +
71              StringUtil.simpleClassName(SocketAddress.class) + ">, " +
72              StringUtil.simpleClassName(ByteBuf.class) + ')';
73  
74      private final DatagramChannelConfig config;
75  
76      private Map<InetAddress, List<MembershipKey>> memberships;
77  
78      private static DatagramChannel newSocket(SelectorProvider provider) {
79          try {
80              /**
81               *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
82               *  {@link SelectorProvider#provider()} which is called by each DatagramChannel.open() otherwise.
83               *
84               *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
85               */
86              return provider.openDatagramChannel();
87          } catch (IOException e) {
88              throw new ChannelException("Failed to open a socket.", e);
89          }
90      }
91  
92      private static DatagramChannel newSocket(SelectorProvider provider, InternetProtocolFamily ipFamily) {
93          if (ipFamily == null) {
94              return newSocket(provider);
95          }
96  
97          checkJavaVersion();
98  
99          try {
100             return provider.openDatagramChannel(ProtocolFamilyConverter.convert(ipFamily));
101         } catch (IOException e) {
102             throw new ChannelException("Failed to open a socket.", e);
103         }
104     }
105 
106     private static void checkJavaVersion() {
107         if (PlatformDependent.javaVersion() < 7) {
108             throw new UnsupportedOperationException("Only supported on java 7+.");
109         }
110     }
111 
112     /**
113      * Create a new instance which will use the Operation Systems default {@link InternetProtocolFamily}.
114      */
115     public NioDatagramChannel() {
116         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
117     }
118 
119     /**
120      * Create a new instance using the given {@link SelectorProvider}
121      * which will use the Operation Systems default {@link InternetProtocolFamily}.
122      */
123     public NioDatagramChannel(SelectorProvider provider) {
124         this(newSocket(provider));
125     }
126 
127     /**
128      * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
129      * on the Operation Systems default which will be chosen.
130      */
131     public NioDatagramChannel(InternetProtocolFamily ipFamily) {
132         this(newSocket(DEFAULT_SELECTOR_PROVIDER, ipFamily));
133     }
134 
135     /**
136      * Create a new instance using the given {@link SelectorProvider} and {@link InternetProtocolFamily}.
137      * If {@link InternetProtocolFamily} is {@code null} it will depend on the Operation Systems default
138      * which will be chosen.
139      */
140     public NioDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) {
141         this(newSocket(provider, ipFamily));
142     }
143 
144     /**
145      * Create a new instance from the given {@link DatagramChannel}.
146      */
147     public NioDatagramChannel(DatagramChannel socket) {
148         super(null, socket, SelectionKey.OP_READ);
149         config = new NioDatagramChannelConfig(this, socket);
150     }
151 
152     @Override
153     public ChannelMetadata metadata() {
154         return METADATA;
155     }
156 
157     @Override
158     public DatagramChannelConfig config() {
159         return config;
160     }
161 
162     @Override
163     @SuppressWarnings("deprecation")
164     public boolean isActive() {
165         DatagramChannel ch = javaChannel();
166         return ch.isOpen() && (
167                 config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
168                 || ch.socket().isBound());
169     }
170 
171     @Override
172     public boolean isConnected() {
173         return javaChannel().isConnected();
174     }
175 
176     @Override
177     protected DatagramChannel javaChannel() {
178         return (DatagramChannel) super.javaChannel();
179     }
180 
181     @Override
182     protected SocketAddress localAddress0() {
183         return javaChannel().socket().getLocalSocketAddress();
184     }
185 
186     @Override
187     protected SocketAddress remoteAddress0() {
188         return javaChannel().socket().getRemoteSocketAddress();
189     }
190 
191     @Override
192     protected void doBind(SocketAddress localAddress) throws Exception {
193         doBind0(localAddress);
194     }
195 
196     private void doBind0(SocketAddress localAddress) throws Exception {
197         if (PlatformDependent.javaVersion() >= 7) {
198             SocketUtils.bind(javaChannel(), localAddress);
199         } else {
200             javaChannel().socket().bind(localAddress);
201         }
202     }
203 
204     @Override
205     protected boolean doConnect(SocketAddress remoteAddress,
206             SocketAddress localAddress) throws Exception {
207         if (localAddress != null) {
208             doBind0(localAddress);
209         }
210 
211         boolean success = false;
212         try {
213             javaChannel().connect(remoteAddress);
214             success = true;
215             return true;
216         } finally {
217             if (!success) {
218                 doClose();
219             }
220         }
221     }
222 
223     @Override
224     protected void doFinishConnect() throws Exception {
225         throw new Error();
226     }
227 
228     @Override
229     protected void doDisconnect() throws Exception {
230         javaChannel().disconnect();
231     }
232 
233     @Override
234     protected void doClose() throws Exception {
235         javaChannel().close();
236     }
237 
238     @Override
239     protected int doReadMessages(List<Object> buf) throws Exception {
240         DatagramChannel ch = javaChannel();
241         DatagramChannelConfig config = config();
242         RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
243 
244         ByteBuf data = allocHandle.allocate(config.getAllocator());
245         allocHandle.attemptedBytesRead(data.writableBytes());
246         boolean free = true;
247         try {
248             ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
249             int pos = nioData.position();
250             InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
251             if (remoteAddress == null) {
252                 return 0;
253             }
254 
255             allocHandle.lastBytesRead(nioData.position() - pos);
256             buf.add(new DatagramPacket(data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()),
257                     localAddress(), remoteAddress));
258             free = false;
259             return 1;
260         } catch (Throwable cause) {
261             PlatformDependent.throwException(cause);
262             return -1;
263         }  finally {
264             if (free) {
265                 data.release();
266             }
267         }
268     }
269 
270     @Override
271     protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
272         final SocketAddress remoteAddress;
273         final ByteBuf data;
274         if (msg instanceof AddressedEnvelope) {
275             @SuppressWarnings("unchecked")
276             AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
277             remoteAddress = envelope.recipient();
278             data = envelope.content();
279         } else {
280             data = (ByteBuf) msg;
281             remoteAddress = null;
282         }
283 
284         final int dataLen = data.readableBytes();
285         if (dataLen == 0) {
286             return true;
287         }
288 
289         final ByteBuffer nioData = data.nioBufferCount() == 1 ? data.internalNioBuffer(data.readerIndex(), dataLen)
290                                                               : data.nioBuffer(data.readerIndex(), dataLen);
291         final int writtenBytes;
292         if (remoteAddress != null) {
293             writtenBytes = javaChannel().send(nioData, remoteAddress);
294         } else {
295             writtenBytes = javaChannel().write(nioData);
296         }
297         return writtenBytes > 0;
298     }
299 
300     @Override
301     protected Object filterOutboundMessage(Object msg) {
302         if (msg instanceof DatagramPacket) {
303             DatagramPacket p = (DatagramPacket) msg;
304             ByteBuf content = p.content();
305             if (isSingleDirectBuffer(content)) {
306                 return p;
307             }
308             return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
309         }
310 
311         if (msg instanceof ByteBuf) {
312             ByteBuf buf = (ByteBuf) msg;
313             if (isSingleDirectBuffer(buf)) {
314                 return buf;
315             }
316             return newDirectBuffer(buf);
317         }
318 
319         if (msg instanceof AddressedEnvelope) {
320             @SuppressWarnings("unchecked")
321             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
322             if (e.content() instanceof ByteBuf) {
323                 ByteBuf content = (ByteBuf) e.content();
324                 if (isSingleDirectBuffer(content)) {
325                     return e;
326                 }
327                 return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
328             }
329         }
330 
331         throw new UnsupportedOperationException(
332                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
333     }
334 
335     /**
336      * Checks if the specified buffer is a direct buffer and is composed of a single NIO buffer.
337      * (We check this because otherwise we need to make it a non-composite buffer.)
338      */
339     private static boolean isSingleDirectBuffer(ByteBuf buf) {
340         return buf.isDirect() && buf.nioBufferCount() == 1;
341     }
342 
343     @Override
344     protected boolean continueOnWriteError() {
345         // Continue on write error as a DatagramChannel can write to multiple remote peers
346         //
347         // See https://github.com/netty/netty/issues/2665
348         return true;
349     }
350 
351     @Override
352     public InetSocketAddress localAddress() {
353         return (InetSocketAddress) super.localAddress();
354     }
355 
356     @Override
357     public InetSocketAddress remoteAddress() {
358         return (InetSocketAddress) super.remoteAddress();
359     }
360 
361     @Override
362     public ChannelFuture joinGroup(InetAddress multicastAddress) {
363         return joinGroup(multicastAddress, newPromise());
364     }
365 
366     @Override
367     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
368         try {
369             return joinGroup(
370                     multicastAddress,
371                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
372                     null, promise);
373         } catch (SocketException e) {
374             promise.setFailure(e);
375         }
376         return promise;
377     }
378 
379     @Override
380     public ChannelFuture joinGroup(
381             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
382         return joinGroup(multicastAddress, networkInterface, newPromise());
383     }
384 
385     @Override
386     public ChannelFuture joinGroup(
387             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
388             ChannelPromise promise) {
389         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
390     }
391 
392     @Override
393     public ChannelFuture joinGroup(
394             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
395         return joinGroup(multicastAddress, networkInterface, source, newPromise());
396     }
397 
398     @Override
399     public ChannelFuture joinGroup(
400             InetAddress multicastAddress, NetworkInterface networkInterface,
401             InetAddress source, ChannelPromise promise) {
402 
403         checkJavaVersion();
404 
405         if (multicastAddress == null) {
406             throw new NullPointerException("multicastAddress");
407         }
408 
409         if (networkInterface == null) {
410             throw new NullPointerException("networkInterface");
411         }
412 
413         try {
414             MembershipKey key;
415             if (source == null) {
416                 key = javaChannel().join(multicastAddress, networkInterface);
417             } else {
418                 key = javaChannel().join(multicastAddress, networkInterface, source);
419             }
420 
421             synchronized (this) {
422                 List<MembershipKey> keys = null;
423                 if (memberships == null) {
424                     memberships = new HashMap<InetAddress, List<MembershipKey>>();
425                 } else {
426                     keys = memberships.get(multicastAddress);
427                 }
428                 if (keys == null) {
429                     keys = new ArrayList<MembershipKey>();
430                     memberships.put(multicastAddress, keys);
431                 }
432                 keys.add(key);
433             }
434 
435             promise.setSuccess();
436         } catch (Throwable e) {
437             promise.setFailure(e);
438         }
439 
440         return promise;
441     }
442 
443     @Override
444     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
445         return leaveGroup(multicastAddress, newPromise());
446     }
447 
448     @Override
449     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
450         try {
451             return leaveGroup(
452                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
453         } catch (SocketException e) {
454             promise.setFailure(e);
455         }
456         return promise;
457     }
458 
459     @Override
460     public ChannelFuture leaveGroup(
461             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
462         return leaveGroup(multicastAddress, networkInterface, newPromise());
463     }
464 
465     @Override
466     public ChannelFuture leaveGroup(
467             InetSocketAddress multicastAddress,
468             NetworkInterface networkInterface, ChannelPromise promise) {
469         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
470     }
471 
472     @Override
473     public ChannelFuture leaveGroup(
474             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
475         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
476     }
477 
478     @Override
479     public ChannelFuture leaveGroup(
480             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
481             ChannelPromise promise) {
482         checkJavaVersion();
483 
484         if (multicastAddress == null) {
485             throw new NullPointerException("multicastAddress");
486         }
487         if (networkInterface == null) {
488             throw new NullPointerException("networkInterface");
489         }
490 
491         synchronized (this) {
492             if (memberships != null) {
493                 List<MembershipKey> keys = memberships.get(multicastAddress);
494                 if (keys != null) {
495                     Iterator<MembershipKey> keyIt = keys.iterator();
496 
497                     while (keyIt.hasNext()) {
498                         MembershipKey key = keyIt.next();
499                         if (networkInterface.equals(key.networkInterface())) {
500                            if (source == null && key.sourceAddress() == null ||
501                                source != null && source.equals(key.sourceAddress())) {
502                                key.drop();
503                                keyIt.remove();
504                            }
505                         }
506                     }
507                     if (keys.isEmpty()) {
508                         memberships.remove(multicastAddress);
509                     }
510                 }
511             }
512         }
513 
514         promise.setSuccess();
515         return promise;
516     }
517 
518     /**
519      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
520      */
521     @Override
522     public ChannelFuture block(
523             InetAddress multicastAddress, NetworkInterface networkInterface,
524             InetAddress sourceToBlock) {
525         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
526     }
527 
528     /**
529      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
530      */
531     @Override
532     public ChannelFuture block(
533             InetAddress multicastAddress, NetworkInterface networkInterface,
534             InetAddress sourceToBlock, ChannelPromise promise) {
535         checkJavaVersion();
536 
537         if (multicastAddress == null) {
538             throw new NullPointerException("multicastAddress");
539         }
540         if (sourceToBlock == null) {
541             throw new NullPointerException("sourceToBlock");
542         }
543 
544         if (networkInterface == null) {
545             throw new NullPointerException("networkInterface");
546         }
547         synchronized (this) {
548             if (memberships != null) {
549                 List<MembershipKey> keys = memberships.get(multicastAddress);
550                 for (MembershipKey key: keys) {
551                     if (networkInterface.equals(key.networkInterface())) {
552                         try {
553                             key.block(sourceToBlock);
554                         } catch (IOException e) {
555                             promise.setFailure(e);
556                         }
557                     }
558                 }
559             }
560         }
561         promise.setSuccess();
562         return promise;
563     }
564 
565     /**
566      * Block the given sourceToBlock address for the given multicastAddress
567      *
568      */
569     @Override
570     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
571         return block(multicastAddress, sourceToBlock, newPromise());
572     }
573 
574     /**
575      * Block the given sourceToBlock address for the given multicastAddress
576      *
577      */
578     @Override
579     public ChannelFuture block(
580             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
581         try {
582             return block(
583                     multicastAddress,
584                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
585                     sourceToBlock, promise);
586         } catch (SocketException e) {
587             promise.setFailure(e);
588         }
589         return promise;
590     }
591 
592     @Override
593     @Deprecated
594     protected void setReadPending(boolean readPending) {
595         super.setReadPending(readPending);
596     }
597 
598     void clearReadPending0() {
599         clearReadPending();
600     }
601 
602     @Override
603     protected boolean closeOnReadError(Throwable cause) {
604         // We do not want to close on SocketException when using DatagramChannel as we usually can continue receiving.
605         // See https://github.com/netty/netty/issues/5893
606         if (cause instanceof SocketException) {
607             return false;
608         }
609         return super.closeOnReadError(cause);
610     }
611 }