View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.socket.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelException;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOption;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.channel.DefaultAddressedEnvelope;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.nio.AbstractNioMessageChannel;
30  import io.netty.channel.socket.DatagramChannelConfig;
31  import io.netty.channel.socket.DatagramPacket;
32  import io.netty.channel.socket.InternetProtocolFamily;
33  import io.netty.util.internal.PlatformDependent;
34  import io.netty.util.internal.StringUtil;
35  
36  import java.io.IOException;
37  import java.net.InetAddress;
38  import java.net.InetSocketAddress;
39  import java.net.NetworkInterface;
40  import java.net.SocketAddress;
41  import java.net.SocketException;
42  import java.nio.ByteBuffer;
43  import java.nio.channels.DatagramChannel;
44  import java.nio.channels.MembershipKey;
45  import java.nio.channels.SelectionKey;
46  import java.nio.channels.spi.SelectorProvider;
47  import java.util.ArrayList;
48  import java.util.HashMap;
49  import java.util.Iterator;
50  import java.util.List;
51  import java.util.Map;
52  
53  /**
54   * An NIO datagram {@link Channel} that sends and receives an
55   * {@link AddressedEnvelope AddressedEnvelope<ByteBuf, SocketAddress>}.
56   *
57   * @see AddressedEnvelope
58   * @see DatagramPacket
59   */
60  public final class NioDatagramChannel
61          extends AbstractNioMessageChannel implements io.netty.channel.socket.DatagramChannel {
62  
63      private static final ChannelMetadata METADATA = new ChannelMetadata(true);
64      private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
65      private static final String EXPECTED_TYPES =
66              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
67              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
68              StringUtil.simpleClassName(ByteBuf.class) + ", " +
69              StringUtil.simpleClassName(SocketAddress.class) + ">, " +
70              StringUtil.simpleClassName(ByteBuf.class) + ')';
71  
72      private final DatagramChannelConfig config;
73  
74      private Map<InetAddress, List<MembershipKey>> memberships;
75  
76      private static DatagramChannel newSocket(SelectorProvider provider) {
77          try {
78              /**
79               *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
80               *  {@link SelectorProvider#provider()} which is called by each DatagramChannel.open() otherwise.
81               *
82               *  See <a href="See https://github.com/netty/netty/issues/2308">#2308</a>.
83               */
84              return provider.openDatagramChannel();
85          } catch (IOException e) {
86              throw new ChannelException("Failed to open a socket.", e);
87          }
88      }
89  
90      private static DatagramChannel newSocket(SelectorProvider provider, InternetProtocolFamily ipFamily) {
91          if (ipFamily == null) {
92              return newSocket(provider);
93          }
94  
95          checkJavaVersion();
96  
97          try {
98              return provider.openDatagramChannel(ProtocolFamilyConverter.convert(ipFamily));
99          } catch (IOException e) {
100             throw new ChannelException("Failed to open a socket.", e);
101         }
102     }
103 
104     private static void checkJavaVersion() {
105         if (PlatformDependent.javaVersion() < 7) {
106             throw new UnsupportedOperationException("Only supported on java 7+.");
107         }
108     }
109 
110     /**
111      * Create a new instance which will use the Operation Systems default {@link InternetProtocolFamily}.
112      */
113     public NioDatagramChannel() {
114         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
115     }
116 
117     /**
118      * Create a new instance using the given {@link SelectorProvider}
119      * which will use the Operation Systems default {@link InternetProtocolFamily}.
120      */
121     public NioDatagramChannel(SelectorProvider provider) {
122         this(newSocket(provider));
123     }
124 
125     /**
126      * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
127      * on the Operation Systems default which will be chosen.
128      */
129     public NioDatagramChannel(InternetProtocolFamily ipFamily) {
130         this(newSocket(DEFAULT_SELECTOR_PROVIDER, ipFamily));
131     }
132 
133     /**
134      * Create a new instance using the given {@link SelectorProvider} and {@link InternetProtocolFamily}.
135      * If {@link InternetProtocolFamily} is {@code null} it will depend on the Operation Systems default
136      * which will be chosen.
137      */
138     public NioDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) {
139         this(newSocket(provider, ipFamily));
140     }
141 
142     /**
143      * Create a new instance from the given {@link DatagramChannel}.
144      */
145     public NioDatagramChannel(DatagramChannel socket) {
146         super(null, socket, SelectionKey.OP_READ);
147         config = new NioDatagramChannelConfig(this, socket);
148     }
149 
150     @Override
151     public ChannelMetadata metadata() {
152         return METADATA;
153     }
154 
155     @Override
156     public DatagramChannelConfig config() {
157         return config;
158     }
159 
160     @Override
161     @SuppressWarnings("deprecation")
162     public boolean isActive() {
163         DatagramChannel ch = javaChannel();
164         return ch.isOpen() && (
165                 config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
166                 || ch.socket().isBound());
167     }
168 
169     @Override
170     public boolean isConnected() {
171         return javaChannel().isConnected();
172     }
173 
174     @Override
175     protected DatagramChannel javaChannel() {
176         return (DatagramChannel) super.javaChannel();
177     }
178 
179     @Override
180     protected SocketAddress localAddress0() {
181         return javaChannel().socket().getLocalSocketAddress();
182     }
183 
184     @Override
185     protected SocketAddress remoteAddress0() {
186         return javaChannel().socket().getRemoteSocketAddress();
187     }
188 
189     @Override
190     protected void doBind(SocketAddress localAddress) throws Exception {
191         javaChannel().socket().bind(localAddress);
192     }
193 
194     @Override
195     protected boolean doConnect(SocketAddress remoteAddress,
196             SocketAddress localAddress) throws Exception {
197         if (localAddress != null) {
198             javaChannel().socket().bind(localAddress);
199         }
200 
201         boolean success = false;
202         try {
203             javaChannel().connect(remoteAddress);
204             success = true;
205             return true;
206         } finally {
207             if (!success) {
208                 doClose();
209             }
210         }
211     }
212 
213     @Override
214     protected void doFinishConnect() throws Exception {
215         throw new Error();
216     }
217 
218     @Override
219     protected void doDisconnect() throws Exception {
220         javaChannel().disconnect();
221     }
222 
223     @Override
224     protected void doClose() throws Exception {
225         javaChannel().close();
226     }
227 
228     @Override
229     protected int doReadMessages(List<Object> buf) throws Exception {
230         DatagramChannel ch = javaChannel();
231         DatagramChannelConfig config = config();
232         RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
233 
234         ByteBuf data = allocHandle.allocate(config.getAllocator());
235         boolean free = true;
236         try {
237             ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
238             int pos = nioData.position();
239             InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
240             if (remoteAddress == null) {
241                 return 0;
242             }
243 
244             int readBytes = nioData.position() - pos;
245             data.writerIndex(data.writerIndex() + readBytes);
246             allocHandle.record(readBytes);
247 
248             buf.add(new DatagramPacket(data, localAddress(), remoteAddress));
249             free = false;
250             return 1;
251         } catch (Throwable cause) {
252             PlatformDependent.throwException(cause);
253             return -1;
254         }  finally {
255             if (free) {
256                 data.release();
257             }
258         }
259     }
260 
261     @Override
262     protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
263         final SocketAddress remoteAddress;
264         final ByteBuf data;
265         if (msg instanceof AddressedEnvelope) {
266             @SuppressWarnings("unchecked")
267             AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
268             remoteAddress = envelope.recipient();
269             data = envelope.content();
270         } else {
271             data = (ByteBuf) msg;
272             remoteAddress = null;
273         }
274 
275         final int dataLen = data.readableBytes();
276         if (dataLen == 0) {
277             return true;
278         }
279 
280         final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen);
281         final int writtenBytes;
282         if (remoteAddress != null) {
283             writtenBytes = javaChannel().send(nioData, remoteAddress);
284         } else {
285             writtenBytes = javaChannel().write(nioData);
286         }
287         return writtenBytes > 0;
288     }
289 
290     @Override
291     protected Object filterOutboundMessage(Object msg) {
292         if (msg instanceof DatagramPacket) {
293             DatagramPacket p = (DatagramPacket) msg;
294             ByteBuf content = p.content();
295             if (isSingleDirectBuffer(content)) {
296                 return p;
297             }
298             return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
299         }
300 
301         if (msg instanceof ByteBuf) {
302             ByteBuf buf = (ByteBuf) msg;
303             if (isSingleDirectBuffer(buf)) {
304                 return buf;
305             }
306             return newDirectBuffer(buf);
307         }
308 
309         if (msg instanceof AddressedEnvelope) {
310             @SuppressWarnings("unchecked")
311             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
312             if (e.content() instanceof ByteBuf) {
313                 ByteBuf content = (ByteBuf) e.content();
314                 if (isSingleDirectBuffer(content)) {
315                     return e;
316                 }
317                 return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
318             }
319         }
320 
321         throw new UnsupportedOperationException(
322                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
323     }
324 
325     /**
326      * Checks if the specified buffer is a direct buffer and is composed of a single NIO buffer.
327      * (We check this because otherwise we need to make it a non-composite buffer.)
328      */
329     private static boolean isSingleDirectBuffer(ByteBuf buf) {
330         return buf.isDirect() && buf.nioBufferCount() == 1;
331     }
332 
333     @Override
334     protected boolean continueOnWriteError() {
335         // Continue on write error as a DatagramChannel can write to multiple remote peers
336         //
337         // See https://github.com/netty/netty/issues/2665
338         return true;
339     }
340 
341     @Override
342     public InetSocketAddress localAddress() {
343         return (InetSocketAddress) super.localAddress();
344     }
345 
346     @Override
347     public InetSocketAddress remoteAddress() {
348         return (InetSocketAddress) super.remoteAddress();
349     }
350 
351     @Override
352     public ChannelFuture joinGroup(InetAddress multicastAddress) {
353         return joinGroup(multicastAddress, newPromise());
354     }
355 
356     @Override
357     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
358         try {
359             return joinGroup(
360                     multicastAddress,
361                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
362                     null, promise);
363         } catch (SocketException e) {
364             promise.setFailure(e);
365         }
366         return promise;
367     }
368 
369     @Override
370     public ChannelFuture joinGroup(
371             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
372         return joinGroup(multicastAddress, networkInterface, newPromise());
373     }
374 
375     @Override
376     public ChannelFuture joinGroup(
377             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
378             ChannelPromise promise) {
379         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
380     }
381 
382     @Override
383     public ChannelFuture joinGroup(
384             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
385         return joinGroup(multicastAddress, networkInterface, source, newPromise());
386     }
387 
388     @Override
389     public ChannelFuture joinGroup(
390             InetAddress multicastAddress, NetworkInterface networkInterface,
391             InetAddress source, ChannelPromise promise) {
392 
393         checkJavaVersion();
394 
395         if (multicastAddress == null) {
396             throw new NullPointerException("multicastAddress");
397         }
398 
399         if (networkInterface == null) {
400             throw new NullPointerException("networkInterface");
401         }
402 
403         try {
404             MembershipKey key;
405             if (source == null) {
406                 key = javaChannel().join(multicastAddress, networkInterface);
407             } else {
408                 key = javaChannel().join(multicastAddress, networkInterface, source);
409             }
410 
411             synchronized (this) {
412                 List<MembershipKey> keys = null;
413                 if (memberships == null) {
414                     memberships = new HashMap<InetAddress, List<MembershipKey>>();
415                 } else {
416                     keys = memberships.get(multicastAddress);
417                 }
418                 if (keys == null) {
419                     keys = new ArrayList<MembershipKey>();
420                     memberships.put(multicastAddress, keys);
421                 }
422                 keys.add(key);
423             }
424 
425             promise.setSuccess();
426         } catch (Throwable e) {
427             promise.setFailure(e);
428         }
429 
430         return promise;
431     }
432 
433     @Override
434     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
435         return leaveGroup(multicastAddress, newPromise());
436     }
437 
438     @Override
439     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
440         try {
441             return leaveGroup(
442                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
443         } catch (SocketException e) {
444             promise.setFailure(e);
445         }
446         return promise;
447     }
448 
449     @Override
450     public ChannelFuture leaveGroup(
451             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
452         return leaveGroup(multicastAddress, networkInterface, newPromise());
453     }
454 
455     @Override
456     public ChannelFuture leaveGroup(
457             InetSocketAddress multicastAddress,
458             NetworkInterface networkInterface, ChannelPromise promise) {
459         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
460     }
461 
462     @Override
463     public ChannelFuture leaveGroup(
464             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
465         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
466     }
467 
468     @Override
469     public ChannelFuture leaveGroup(
470             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
471             ChannelPromise promise) {
472         checkJavaVersion();
473 
474         if (multicastAddress == null) {
475             throw new NullPointerException("multicastAddress");
476         }
477         if (networkInterface == null) {
478             throw new NullPointerException("networkInterface");
479         }
480 
481         synchronized (this) {
482             if (memberships != null) {
483                 List<MembershipKey> keys = memberships.get(multicastAddress);
484                 if (keys != null) {
485                     Iterator<MembershipKey> keyIt = keys.iterator();
486 
487                     while (keyIt.hasNext()) {
488                         MembershipKey key = keyIt.next();
489                         if (networkInterface.equals(key.networkInterface())) {
490                            if (source == null && key.sourceAddress() == null ||
491                                source != null && source.equals(key.sourceAddress())) {
492                                key.drop();
493                                keyIt.remove();
494                            }
495                         }
496                     }
497                     if (keys.isEmpty()) {
498                         memberships.remove(multicastAddress);
499                     }
500                 }
501             }
502         }
503 
504         promise.setSuccess();
505         return promise;
506     }
507 
508     /**
509      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
510      */
511     @Override
512     public ChannelFuture block(
513             InetAddress multicastAddress, NetworkInterface networkInterface,
514             InetAddress sourceToBlock) {
515         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
516     }
517 
518     /**
519      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
520      */
521     @Override
522     public ChannelFuture block(
523             InetAddress multicastAddress, NetworkInterface networkInterface,
524             InetAddress sourceToBlock, ChannelPromise promise) {
525         checkJavaVersion();
526 
527         if (multicastAddress == null) {
528             throw new NullPointerException("multicastAddress");
529         }
530         if (sourceToBlock == null) {
531             throw new NullPointerException("sourceToBlock");
532         }
533 
534         if (networkInterface == null) {
535             throw new NullPointerException("networkInterface");
536         }
537         synchronized (this) {
538             if (memberships != null) {
539                 List<MembershipKey> keys = memberships.get(multicastAddress);
540                 for (MembershipKey key: keys) {
541                     if (networkInterface.equals(key.networkInterface())) {
542                         try {
543                             key.block(sourceToBlock);
544                         } catch (IOException e) {
545                             promise.setFailure(e);
546                         }
547                     }
548                 }
549             }
550         }
551         promise.setSuccess();
552         return promise;
553     }
554 
555     /**
556      * Block the given sourceToBlock address for the given multicastAddress
557      *
558      */
559     @Override
560     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
561         return block(multicastAddress, sourceToBlock, newPromise());
562     }
563 
564     /**
565      * Block the given sourceToBlock address for the given multicastAddress
566      *
567      */
568     @Override
569     public ChannelFuture block(
570             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
571         try {
572             return block(
573                     multicastAddress,
574                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
575                     sourceToBlock, promise);
576         } catch (SocketException e) {
577             promise.setFailure(e);
578         }
579         return promise;
580     }
581 
582     @Override
583     protected void setReadPending(boolean readPending) {
584         super.setReadPending(readPending);
585     }
586 }