View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.socket.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelException;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOption;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.channel.DefaultAddressedEnvelope;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.nio.AbstractNioMessageChannel;
30  import io.netty.channel.socket.DatagramChannelConfig;
31  import io.netty.channel.socket.DatagramPacket;
32  import io.netty.channel.socket.InternetProtocolFamily;
33  import io.netty.util.internal.SocketUtils;
34  import io.netty.util.internal.PlatformDependent;
35  import io.netty.util.internal.StringUtil;
36  import io.netty.util.internal.UnstableApi;
37  
38  import java.io.IOException;
39  import java.net.InetAddress;
40  import java.net.InetSocketAddress;
41  import java.net.NetworkInterface;
42  import java.net.SocketAddress;
43  import java.net.SocketException;
44  import java.nio.ByteBuffer;
45  import java.nio.channels.DatagramChannel;
46  import java.nio.channels.MembershipKey;
47  import java.nio.channels.SelectionKey;
48  import java.nio.channels.spi.SelectorProvider;
49  import java.util.ArrayList;
50  import java.util.HashMap;
51  import java.util.Iterator;
52  import java.util.List;
53  import java.util.Map;
54  
55  /**
56   * An NIO datagram {@link Channel} that sends and receives an
57   * {@link AddressedEnvelope AddressedEnvelope<ByteBuf, SocketAddress>}.
58   *
59   * @see AddressedEnvelope
60   * @see DatagramPacket
61   */
62  public final class NioDatagramChannel
63          extends AbstractNioMessageChannel implements io.netty.channel.socket.DatagramChannel {
64  
65      private static final ChannelMetadata METADATA = new ChannelMetadata(true);
66      private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
67      private static final String EXPECTED_TYPES =
68              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
69              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
70              StringUtil.simpleClassName(ByteBuf.class) + ", " +
71              StringUtil.simpleClassName(SocketAddress.class) + ">, " +
72              StringUtil.simpleClassName(ByteBuf.class) + ')';
73  
74      private final DatagramChannelConfig config;
75  
76      private Map<InetAddress, List<MembershipKey>> memberships;
77      private RecvByteBufAllocator.Handle allocHandle;
78  
79      private static DatagramChannel newSocket(SelectorProvider provider) {
80          try {
81              /**
82               *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
83               *  {@link SelectorProvider#provider()} which is called by each DatagramChannel.open() otherwise.
84               *
85               *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
86               */
87              return provider.openDatagramChannel();
88          } catch (IOException e) {
89              throw new ChannelException("Failed to open a socket.", e);
90          }
91      }
92  
93      private static DatagramChannel newSocket(SelectorProvider provider, InternetProtocolFamily ipFamily) {
94          if (ipFamily == null) {
95              return newSocket(provider);
96          }
97  
98          checkJavaVersion();
99  
100         try {
101             return provider.openDatagramChannel(ProtocolFamilyConverter.convert(ipFamily));
102         } catch (IOException e) {
103             throw new ChannelException("Failed to open a socket.", e);
104         }
105     }
106 
107     private static void checkJavaVersion() {
108         if (PlatformDependent.javaVersion() < 7) {
109             throw new UnsupportedOperationException("Only supported on java 7+.");
110         }
111     }
112 
113     /**
114      * Create a new instance which will use the Operation Systems default {@link InternetProtocolFamily}.
115      */
116     public NioDatagramChannel() {
117         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
118     }
119 
120     /**
121      * Create a new instance using the given {@link SelectorProvider}
122      * which will use the Operation Systems default {@link InternetProtocolFamily}.
123      */
124     public NioDatagramChannel(SelectorProvider provider) {
125         this(newSocket(provider));
126     }
127 
128     /**
129      * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
130      * on the Operation Systems default which will be chosen.
131      */
132     public NioDatagramChannel(InternetProtocolFamily ipFamily) {
133         this(newSocket(DEFAULT_SELECTOR_PROVIDER, ipFamily));
134     }
135 
136     /**
137      * Create a new instance using the given {@link SelectorProvider} and {@link InternetProtocolFamily}.
138      * If {@link InternetProtocolFamily} is {@code null} it will depend on the Operation Systems default
139      * which will be chosen.
140      */
141     public NioDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) {
142         this(newSocket(provider, ipFamily));
143     }
144 
145     /**
146      * Create a new instance from the given {@link DatagramChannel}.
147      */
148     public NioDatagramChannel(DatagramChannel socket) {
149         super(null, socket, SelectionKey.OP_READ);
150         config = new NioDatagramChannelConfig(this, socket);
151     }
152 
153     @Override
154     public ChannelMetadata metadata() {
155         return METADATA;
156     }
157 
158     @Override
159     public DatagramChannelConfig config() {
160         return config;
161     }
162 
163     @Override
164     @SuppressWarnings("deprecation")
165     public boolean isActive() {
166         DatagramChannel ch = javaChannel();
167         return ch.isOpen() && (
168                 config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
169                 || ch.socket().isBound());
170     }
171 
172     @Override
173     public boolean isConnected() {
174         return javaChannel().isConnected();
175     }
176 
177     @Override
178     protected DatagramChannel javaChannel() {
179         return (DatagramChannel) super.javaChannel();
180     }
181 
182     @Override
183     protected SocketAddress localAddress0() {
184         return javaChannel().socket().getLocalSocketAddress();
185     }
186 
187     @Override
188     protected SocketAddress remoteAddress0() {
189         return javaChannel().socket().getRemoteSocketAddress();
190     }
191 
192     @Override
193     protected void doBind(SocketAddress localAddress) throws Exception {
194         doBind0(localAddress);
195     }
196 
197     private void doBind0(SocketAddress localAddress) throws Exception {
198         if (PlatformDependent.javaVersion() >= 7) {
199             SocketUtils.bind(javaChannel(), localAddress);
200         } else {
201             javaChannel().socket().bind(localAddress);
202         }
203     }
204 
205     @Override
206     protected boolean doConnect(SocketAddress remoteAddress,
207             SocketAddress localAddress) throws Exception {
208         if (localAddress != null) {
209             doBind0(localAddress);
210         }
211 
212         boolean success = false;
213         try {
214             javaChannel().connect(remoteAddress);
215             success = true;
216             return true;
217         } finally {
218             if (!success) {
219                 doClose();
220             }
221         }
222     }
223 
224     @Override
225     protected void doFinishConnect() throws Exception {
226         throw new Error();
227     }
228 
229     @Override
230     protected void doDisconnect() throws Exception {
231         javaChannel().disconnect();
232     }
233 
234     @Override
235     protected void doClose() throws Exception {
236         javaChannel().close();
237     }
238 
239     @Override
240     protected int doReadMessages(List<Object> buf) throws Exception {
241         DatagramChannel ch = javaChannel();
242         DatagramChannelConfig config = config();
243         RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
244         if (allocHandle == null) {
245             this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
246         }
247         ByteBuf data = allocHandle.allocate(config.getAllocator());
248         boolean free = true;
249         try {
250             ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
251             int pos = nioData.position();
252             InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
253             if (remoteAddress == null) {
254                 return 0;
255             }
256 
257             int readBytes = nioData.position() - pos;
258             data.writerIndex(data.writerIndex() + readBytes);
259             allocHandle.record(readBytes);
260 
261             buf.add(new DatagramPacket(data, localAddress(), remoteAddress));
262             free = false;
263             return 1;
264         } catch (Throwable cause) {
265             PlatformDependent.throwException(cause);
266             return -1;
267         }  finally {
268             if (free) {
269                 data.release();
270             }
271         }
272     }
273 
274     @Override
275     protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
276         final SocketAddress remoteAddress;
277         final ByteBuf data;
278         if (msg instanceof AddressedEnvelope) {
279             @SuppressWarnings("unchecked")
280             AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
281             remoteAddress = envelope.recipient();
282             data = envelope.content();
283         } else {
284             data = (ByteBuf) msg;
285             remoteAddress = null;
286         }
287 
288         final int dataLen = data.readableBytes();
289         if (dataLen == 0) {
290             return true;
291         }
292 
293         final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen);
294         final int writtenBytes;
295         if (remoteAddress != null) {
296             writtenBytes = javaChannel().send(nioData, remoteAddress);
297         } else {
298             writtenBytes = javaChannel().write(nioData);
299         }
300         return writtenBytes > 0;
301     }
302 
303     @Override
304     protected Object filterOutboundMessage(Object msg) {
305         if (msg instanceof DatagramPacket) {
306             DatagramPacket p = (DatagramPacket) msg;
307             ByteBuf content = p.content();
308             if (isSingleDirectBuffer(content)) {
309                 return p;
310             }
311             return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
312         }
313 
314         if (msg instanceof ByteBuf) {
315             ByteBuf buf = (ByteBuf) msg;
316             if (isSingleDirectBuffer(buf)) {
317                 return buf;
318             }
319             return newDirectBuffer(buf);
320         }
321 
322         if (msg instanceof AddressedEnvelope) {
323             @SuppressWarnings("unchecked")
324             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
325             if (e.content() instanceof ByteBuf) {
326                 ByteBuf content = (ByteBuf) e.content();
327                 if (isSingleDirectBuffer(content)) {
328                     return e;
329                 }
330                 return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
331             }
332         }
333 
334         throw new UnsupportedOperationException(
335                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
336     }
337 
338     /**
339      * Checks if the specified buffer is a direct buffer and is composed of a single NIO buffer.
340      * (We check this because otherwise we need to make it a non-composite buffer.)
341      */
342     private static boolean isSingleDirectBuffer(ByteBuf buf) {
343         return buf.isDirect() && buf.nioBufferCount() == 1;
344     }
345 
346     @Override
347     protected boolean continueOnWriteError() {
348         // Continue on write error as a DatagramChannel can write to multiple remote peers
349         //
350         // See https://github.com/netty/netty/issues/2665
351         return true;
352     }
353 
354     @Override
355     public InetSocketAddress localAddress() {
356         return (InetSocketAddress) super.localAddress();
357     }
358 
359     @Override
360     public InetSocketAddress remoteAddress() {
361         return (InetSocketAddress) super.remoteAddress();
362     }
363 
364     @Override
365     public ChannelFuture joinGroup(InetAddress multicastAddress) {
366         return joinGroup(multicastAddress, newPromise());
367     }
368 
369     @Override
370     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
371         try {
372             return joinGroup(
373                     multicastAddress,
374                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
375                     null, promise);
376         } catch (SocketException e) {
377             promise.setFailure(e);
378         }
379         return promise;
380     }
381 
382     @Override
383     public ChannelFuture joinGroup(
384             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
385         return joinGroup(multicastAddress, networkInterface, newPromise());
386     }
387 
388     @Override
389     public ChannelFuture joinGroup(
390             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
391             ChannelPromise promise) {
392         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
393     }
394 
395     @Override
396     public ChannelFuture joinGroup(
397             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
398         return joinGroup(multicastAddress, networkInterface, source, newPromise());
399     }
400 
401     @Override
402     public ChannelFuture joinGroup(
403             InetAddress multicastAddress, NetworkInterface networkInterface,
404             InetAddress source, ChannelPromise promise) {
405 
406         checkJavaVersion();
407 
408         if (multicastAddress == null) {
409             throw new NullPointerException("multicastAddress");
410         }
411 
412         if (networkInterface == null) {
413             throw new NullPointerException("networkInterface");
414         }
415 
416         try {
417             MembershipKey key;
418             if (source == null) {
419                 key = javaChannel().join(multicastAddress, networkInterface);
420             } else {
421                 key = javaChannel().join(multicastAddress, networkInterface, source);
422             }
423 
424             synchronized (this) {
425                 List<MembershipKey> keys = null;
426                 if (memberships == null) {
427                     memberships = new HashMap<InetAddress, List<MembershipKey>>();
428                 } else {
429                     keys = memberships.get(multicastAddress);
430                 }
431                 if (keys == null) {
432                     keys = new ArrayList<MembershipKey>();
433                     memberships.put(multicastAddress, keys);
434                 }
435                 keys.add(key);
436             }
437 
438             promise.setSuccess();
439         } catch (Throwable e) {
440             promise.setFailure(e);
441         }
442 
443         return promise;
444     }
445 
446     @Override
447     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
448         return leaveGroup(multicastAddress, newPromise());
449     }
450 
451     @Override
452     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
453         try {
454             return leaveGroup(
455                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
456         } catch (SocketException e) {
457             promise.setFailure(e);
458         }
459         return promise;
460     }
461 
462     @Override
463     public ChannelFuture leaveGroup(
464             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
465         return leaveGroup(multicastAddress, networkInterface, newPromise());
466     }
467 
468     @Override
469     public ChannelFuture leaveGroup(
470             InetSocketAddress multicastAddress,
471             NetworkInterface networkInterface, ChannelPromise promise) {
472         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
473     }
474 
475     @Override
476     public ChannelFuture leaveGroup(
477             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
478         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
479     }
480 
481     @Override
482     public ChannelFuture leaveGroup(
483             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
484             ChannelPromise promise) {
485         checkJavaVersion();
486 
487         if (multicastAddress == null) {
488             throw new NullPointerException("multicastAddress");
489         }
490         if (networkInterface == null) {
491             throw new NullPointerException("networkInterface");
492         }
493 
494         synchronized (this) {
495             if (memberships != null) {
496                 List<MembershipKey> keys = memberships.get(multicastAddress);
497                 if (keys != null) {
498                     Iterator<MembershipKey> keyIt = keys.iterator();
499 
500                     while (keyIt.hasNext()) {
501                         MembershipKey key = keyIt.next();
502                         if (networkInterface.equals(key.networkInterface())) {
503                            if (source == null && key.sourceAddress() == null ||
504                                source != null && source.equals(key.sourceAddress())) {
505                                key.drop();
506                                keyIt.remove();
507                            }
508                         }
509                     }
510                     if (keys.isEmpty()) {
511                         memberships.remove(multicastAddress);
512                     }
513                 }
514             }
515         }
516 
517         promise.setSuccess();
518         return promise;
519     }
520 
521     /**
522      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
523      */
524     @Override
525     public ChannelFuture block(
526             InetAddress multicastAddress, NetworkInterface networkInterface,
527             InetAddress sourceToBlock) {
528         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
529     }
530 
531     /**
532      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
533      */
534     @Override
535     public ChannelFuture block(
536             InetAddress multicastAddress, NetworkInterface networkInterface,
537             InetAddress sourceToBlock, ChannelPromise promise) {
538         checkJavaVersion();
539 
540         if (multicastAddress == null) {
541             throw new NullPointerException("multicastAddress");
542         }
543         if (sourceToBlock == null) {
544             throw new NullPointerException("sourceToBlock");
545         }
546 
547         if (networkInterface == null) {
548             throw new NullPointerException("networkInterface");
549         }
550         synchronized (this) {
551             if (memberships != null) {
552                 List<MembershipKey> keys = memberships.get(multicastAddress);
553                 for (MembershipKey key: keys) {
554                     if (networkInterface.equals(key.networkInterface())) {
555                         try {
556                             key.block(sourceToBlock);
557                         } catch (IOException e) {
558                             promise.setFailure(e);
559                         }
560                     }
561                 }
562             }
563         }
564         promise.setSuccess();
565         return promise;
566     }
567 
568     /**
569      * Block the given sourceToBlock address for the given multicastAddress
570      *
571      */
572     @Override
573     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
574         return block(multicastAddress, sourceToBlock, newPromise());
575     }
576 
577     /**
578      * Block the given sourceToBlock address for the given multicastAddress
579      *
580      */
581     @Override
582     public ChannelFuture block(
583             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
584         try {
585             return block(
586                     multicastAddress,
587                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
588                     sourceToBlock, promise);
589         } catch (SocketException e) {
590             promise.setFailure(e);
591         }
592         return promise;
593     }
594 
595     @Override
596     protected void setReadPending(boolean readPending) {
597         super.setReadPending(readPending);
598     }
599 
600     @Override
601     protected boolean closeOnReadError(Throwable cause) {
602         // We do not want to close on SocketException when using DatagramChannel as we usually can continue receiving.
603         // See https://github.com/netty/netty/issues/5893
604         if (cause instanceof SocketException) {
605             return false;
606         }
607         return super.closeOnReadError(cause);
608     }
609 }