View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.CompositeByteBuf;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOption;
24  import io.netty.channel.ChannelOutboundBuffer;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.channel.DefaultAddressedEnvelope;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.socket.DatagramChannel;
30  import io.netty.channel.socket.DatagramChannelConfig;
31  import io.netty.channel.socket.DatagramPacket;
32  import io.netty.channel.unix.FileDescriptor;
33  import io.netty.util.internal.PlatformDependent;
34  import io.netty.util.internal.StringUtil;
35  
36  import java.io.IOException;
37  import java.net.InetAddress;
38  import java.net.InetSocketAddress;
39  import java.net.NetworkInterface;
40  import java.net.SocketAddress;
41  import java.net.SocketException;
42  import java.nio.ByteBuffer;
43  import java.nio.channels.NotYetConnectedException;
44  import java.util.ArrayList;
45  import java.util.List;
46  
47  /**
48   * {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
49   * maximal performance.
50   */
51  public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
52      private static final ChannelMetadata METADATA = new ChannelMetadata(true);
53      private static final String EXPECTED_TYPES =
54              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
55              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
56              StringUtil.simpleClassName(ByteBuf.class) + ", " +
57              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
58              StringUtil.simpleClassName(ByteBuf.class) + ')';
59  
60      private volatile InetSocketAddress local;
61      private volatile InetSocketAddress remote;
62      private volatile boolean connected;
63      private final EpollDatagramChannelConfig config;
64  
65      public EpollDatagramChannel() {
66          super(Native.socketDgramFd(), Native.EPOLLIN);
67          config = new EpollDatagramChannelConfig(this);
68      }
69  
70      /**
71       * Create a new {@link EpollDatagramChannel} from the given {@link FileDescriptor}.
72       */
73      public EpollDatagramChannel(FileDescriptor fd) {
74          super(null, fd, Native.EPOLLIN, true);
75          config = new EpollDatagramChannelConfig(this);
76  
77          // As we create an EpollDatagramChannel from a FileDescriptor we should try to obtain the remote and local
78          // address from it. This is needed as the FileDescriptor may be bound already.
79          local = Native.localAddress(fd.intValue());
80      }
81  
82      @Override
83      public InetSocketAddress remoteAddress() {
84          return (InetSocketAddress) super.remoteAddress();
85      }
86  
87      @Override
88      public InetSocketAddress localAddress() {
89          return (InetSocketAddress) super.localAddress();
90      }
91  
92      @Override
93      public ChannelMetadata metadata() {
94          return METADATA;
95      }
96  
97      @Override
98      @SuppressWarnings("deprecation")
99      public boolean isActive() {
100         return fd().isOpen() &&
101                 (config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
102                         || active);
103     }
104 
105     @Override
106     public boolean isConnected() {
107         return connected;
108     }
109 
110     @Override
111     public ChannelFuture joinGroup(InetAddress multicastAddress) {
112         return joinGroup(multicastAddress, newPromise());
113     }
114 
115     @Override
116     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
117         try {
118             return joinGroup(
119                     multicastAddress,
120                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
121                     null, promise);
122         } catch (SocketException e) {
123             promise.setFailure(e);
124         }
125         return promise;
126     }
127 
128     @Override
129     public ChannelFuture joinGroup(
130             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
131         return joinGroup(multicastAddress, networkInterface, newPromise());
132     }
133 
134     @Override
135     public ChannelFuture joinGroup(
136             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
137             ChannelPromise promise) {
138         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
139     }
140 
141     @Override
142     public ChannelFuture joinGroup(
143             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
144         return joinGroup(multicastAddress, networkInterface, source, newPromise());
145     }
146 
147     @Override
148     public ChannelFuture joinGroup(
149             final InetAddress multicastAddress, final NetworkInterface networkInterface,
150             final InetAddress source, final ChannelPromise promise) {
151 
152         if (multicastAddress == null) {
153             throw new NullPointerException("multicastAddress");
154         }
155 
156         if (networkInterface == null) {
157             throw new NullPointerException("networkInterface");
158         }
159 
160         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
161         return promise;
162     }
163 
164     @Override
165     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
166         return leaveGroup(multicastAddress, newPromise());
167     }
168 
169     @Override
170     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
171         try {
172             return leaveGroup(
173                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
174         } catch (SocketException e) {
175             promise.setFailure(e);
176         }
177         return promise;
178     }
179 
180     @Override
181     public ChannelFuture leaveGroup(
182             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
183         return leaveGroup(multicastAddress, networkInterface, newPromise());
184     }
185 
186     @Override
187     public ChannelFuture leaveGroup(
188             InetSocketAddress multicastAddress,
189             NetworkInterface networkInterface, ChannelPromise promise) {
190         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
191     }
192 
193     @Override
194     public ChannelFuture leaveGroup(
195             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
196         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
197     }
198 
199     @Override
200     public ChannelFuture leaveGroup(
201             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
202             final ChannelPromise promise) {
203         if (multicastAddress == null) {
204             throw new NullPointerException("multicastAddress");
205         }
206         if (networkInterface == null) {
207             throw new NullPointerException("networkInterface");
208         }
209 
210         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
211 
212         return promise;
213     }
214 
215     @Override
216     public ChannelFuture block(
217             InetAddress multicastAddress, NetworkInterface networkInterface,
218             InetAddress sourceToBlock) {
219         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
220     }
221 
222     @Override
223     public ChannelFuture block(
224             final InetAddress multicastAddress, final NetworkInterface networkInterface,
225             final InetAddress sourceToBlock, final ChannelPromise promise) {
226         if (multicastAddress == null) {
227             throw new NullPointerException("multicastAddress");
228         }
229         if (sourceToBlock == null) {
230             throw new NullPointerException("sourceToBlock");
231         }
232 
233         if (networkInterface == null) {
234             throw new NullPointerException("networkInterface");
235         }
236         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
237         return promise;
238     }
239 
240     @Override
241     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
242         return block(multicastAddress, sourceToBlock, newPromise());
243     }
244 
245     @Override
246     public ChannelFuture block(
247             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
248         try {
249             return block(
250                     multicastAddress,
251                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
252                     sourceToBlock, promise);
253         } catch (Throwable e) {
254             promise.setFailure(e);
255         }
256         return promise;
257     }
258 
259     @Override
260     protected AbstractEpollUnsafe newUnsafe() {
261         return new EpollDatagramChannelUnsafe();
262     }
263 
264     @Override
265     protected InetSocketAddress localAddress0() {
266         return local;
267     }
268 
269     @Override
270     protected InetSocketAddress remoteAddress0() {
271         return remote;
272     }
273 
274     @Override
275     protected void doBind(SocketAddress localAddress) throws Exception {
276         InetSocketAddress addr = (InetSocketAddress) localAddress;
277         checkResolvable(addr);
278         int fd = fd().intValue();
279         Native.bind(fd, addr);
280         local = Native.localAddress(fd);
281         active = true;
282     }
283 
284     @Override
285     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
286         for (;;) {
287             Object msg = in.current();
288             if (msg == null) {
289                 // Wrote all messages.
290                 clearFlag(Native.EPOLLOUT);
291                 break;
292             }
293 
294             try {
295                 // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
296                 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
297                     NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in);
298                     int cnt = array.count();
299 
300                     if (cnt >= 1) {
301                         // Try to use gathering writes via sendmmsg(...) syscall.
302                         int offset = 0;
303                         NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
304 
305                         while (cnt > 0) {
306                             int send = Native.sendmmsg(fd().intValue(), packets, offset, cnt);
307                             if (send == 0) {
308                                 // Did not write all messages.
309                                 setFlag(Native.EPOLLOUT);
310                                 return;
311                             }
312                             for (int i = 0; i < send; i++) {
313                                 in.remove();
314                             }
315                             cnt -= send;
316                             offset += send;
317                         }
318                         continue;
319                     }
320                 }
321                 boolean done = false;
322                 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
323                     if (doWriteMessage(msg)) {
324                         done = true;
325                         break;
326                     }
327                 }
328 
329                 if (done) {
330                     in.remove();
331                 } else {
332                     // Did not write all messages.
333                     setFlag(Native.EPOLLOUT);
334                     break;
335                 }
336             } catch (IOException e) {
337                 // Continue on write error as a DatagramChannel can write to multiple remote peers
338                 //
339                 // See https://github.com/netty/netty/issues/2665
340                 in.remove(e);
341             }
342         }
343     }
344 
345     private boolean doWriteMessage(Object msg) throws Exception {
346         final ByteBuf data;
347         InetSocketAddress remoteAddress;
348         if (msg instanceof AddressedEnvelope) {
349             @SuppressWarnings("unchecked")
350             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
351                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
352             data = envelope.content();
353             remoteAddress = envelope.recipient();
354         } else {
355             data = (ByteBuf) msg;
356             remoteAddress = null;
357         }
358 
359         final int dataLen = data.readableBytes();
360         if (dataLen == 0) {
361             return true;
362         }
363 
364         if (remoteAddress == null) {
365             remoteAddress = remote;
366             if (remoteAddress == null) {
367                 throw new NotYetConnectedException();
368             }
369         }
370 
371         final int writtenBytes;
372         if (data.hasMemoryAddress()) {
373             long memoryAddress = data.memoryAddress();
374             writtenBytes = Native.sendToAddress(fd().intValue(), memoryAddress, data.readerIndex(), data.writerIndex(),
375                     remoteAddress.getAddress(), remoteAddress.getPort());
376         } else if (data instanceof CompositeByteBuf) {
377             IovArray array = IovArrayThreadLocal.get((CompositeByteBuf) data);
378             int cnt = array.count();
379             assert cnt != 0;
380 
381             writtenBytes = Native.sendToAddresses(fd().intValue(), array.memoryAddress(0),
382                     cnt, remoteAddress.getAddress(), remoteAddress.getPort());
383         } else  {
384             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
385             writtenBytes = Native.sendTo(fd().intValue(), nioData, nioData.position(), nioData.limit(),
386                     remoteAddress.getAddress(), remoteAddress.getPort());
387         }
388 
389         return writtenBytes > 0;
390     }
391 
392     @Override
393     protected Object filterOutboundMessage(Object msg) {
394         if (msg instanceof DatagramPacket) {
395             DatagramPacket packet = (DatagramPacket) msg;
396             ByteBuf content = packet.content();
397             if (content.hasMemoryAddress()) {
398                 return msg;
399             }
400 
401             if (content.isDirect() && content instanceof CompositeByteBuf) {
402                 // Special handling of CompositeByteBuf to reduce memory copies if some of the Components
403                 // in the CompositeByteBuf are backed by a memoryAddress.
404                 CompositeByteBuf comp = (CompositeByteBuf) content;
405                 if (comp.isDirect() && comp.nioBufferCount() <= Native.IOV_MAX) {
406                     return msg;
407                 }
408             }
409             // We can only handle direct buffers so we need to copy if a non direct is
410             // passed to write.
411             return new DatagramPacket(newDirectBuffer(packet, content), packet.recipient());
412         }
413 
414         if (msg instanceof ByteBuf) {
415             ByteBuf buf = (ByteBuf) msg;
416             if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
417                 if (buf instanceof CompositeByteBuf) {
418                     // Special handling of CompositeByteBuf to reduce memory copies if some of the Components
419                     // in the CompositeByteBuf are backed by a memoryAddress.
420                     CompositeByteBuf comp = (CompositeByteBuf) buf;
421                     if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) {
422                         // more then 1024 buffers for gathering writes so just do a memory copy.
423                         buf = newDirectBuffer(buf);
424                         assert buf.hasMemoryAddress();
425                     }
426                 } else {
427                     // We can only handle buffers with memory address so we need to copy if a non direct is
428                     // passed to write.
429                     buf = newDirectBuffer(buf);
430                     assert buf.hasMemoryAddress();
431                 }
432             }
433             return buf;
434         }
435 
436         if (msg instanceof AddressedEnvelope) {
437             @SuppressWarnings("unchecked")
438             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
439             if (e.content() instanceof ByteBuf &&
440                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
441 
442                 ByteBuf content = (ByteBuf) e.content();
443                 if (content.hasMemoryAddress()) {
444                     return e;
445                 }
446                 if (content instanceof CompositeByteBuf) {
447                     // Special handling of CompositeByteBuf to reduce memory copies if some of the Components
448                     // in the CompositeByteBuf are backed by a memoryAddress.
449                     CompositeByteBuf comp = (CompositeByteBuf) content;
450                     if (comp.isDirect() && comp.nioBufferCount() <= Native.IOV_MAX) {
451                         return e;
452                     }
453                 }
454                 // We can only handle direct buffers so we need to copy if a non direct is
455                 // passed to write.
456                 return new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
457                         newDirectBuffer(e, content), (InetSocketAddress) e.recipient());
458             }
459         }
460 
461         throw new UnsupportedOperationException(
462                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
463     }
464 
465     @Override
466     public EpollDatagramChannelConfig config() {
467         return config;
468     }
469 
470     @Override
471     protected void doDisconnect() throws Exception {
472         connected = false;
473     }
474 
475     final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
476 
477         private final List<Object> readBuf = new ArrayList<Object>();
478 
479         @Override
480         public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
481             boolean success = false;
482             try {
483                 try {
484                     boolean wasActive = isActive();
485                     InetSocketAddress remoteAddress = (InetSocketAddress) remote;
486                     if (local != null) {
487                         InetSocketAddress localAddress = (InetSocketAddress) local;
488                         doBind(localAddress);
489                     }
490 
491                     checkResolvable(remoteAddress);
492                     EpollDatagramChannel.this.remote = remoteAddress;
493                     EpollDatagramChannel.this.local = Native.localAddress(fd().intValue());
494                     success = true;
495 
496                     // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
497                     // because what happened is what happened.
498                     if (!wasActive && isActive()) {
499                         pipeline().fireChannelActive();
500                     }
501                 } finally {
502                     if (!success) {
503                         doClose();
504                     } else {
505                         channelPromise.setSuccess();
506                         connected = true;
507                     }
508                 }
509             } catch (Throwable cause) {
510                 channelPromise.setFailure(cause);
511             }
512         }
513 
514         @Override
515         void epollInReady() {
516             assert eventLoop().inEventLoop();
517             DatagramChannelConfig config = config();
518     boolean edgeTriggered = isFlagSet(Native.EPOLLET);
519 
520             if (!readPending && !edgeTriggered && !config.isAutoRead()) {
521                 // ChannelConfig.setAutoRead(false) was called in the meantime
522                 clearEpollIn0();
523                 return;
524             }
525 
526             RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
527 
528             final ChannelPipeline pipeline = pipeline();
529             Throwable exception = null;
530             try {
531                 // if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
532                 final int maxMessagesPerRead = edgeTriggered
533                         ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
534                 int messages = 0;
535                 do {
536                     ByteBuf data = null;
537                     try {
538                         data = allocHandle.allocate(config.getAllocator());
539                         int writerIndex = data.writerIndex();
540                         DatagramSocketAddress remoteAddress;
541                         if (data.hasMemoryAddress()) {
542                             // has a memory address so use optimized call
543                             remoteAddress = Native.recvFromAddress(
544                                     fd().intValue(), data.memoryAddress(), writerIndex, data.capacity());
545                         } else {
546                             ByteBuffer nioData = data.internalNioBuffer(writerIndex, data.writableBytes());
547                             remoteAddress = Native.recvFrom(
548                                     fd().intValue(), nioData, nioData.position(), nioData.limit());
549                         }
550 
551                         if (remoteAddress == null) {
552                             break;
553                         }
554 
555                         int readBytes = remoteAddress.receivedAmount;
556                         data.writerIndex(data.writerIndex() + readBytes);
557                         allocHandle.record(readBytes);
558                         readPending = false;
559 
560                         readBuf.add(new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
561                         data = null;
562                     } catch (Throwable t) {
563                         // We do not break from the loop here and remember the last exception,
564                         // because we need to consume everything from the socket used with epoll ET.
565                         exception = t;
566                     } finally {
567                         if (data != null) {
568                             data.release();
569                         }
570                         if (!edgeTriggered && !config.isAutoRead()) {
571                             // This is not using EPOLLET so we can stop reading
572                             // ASAP as we will get notified again later with
573                             // pending data
574                             break;
575                         }
576                     }
577                 } while (++ messages < maxMessagesPerRead);
578 
579                 int size = readBuf.size();
580                 for (int i = 0; i < size; i ++) {
581                     pipeline.fireChannelRead(readBuf.get(i));
582                 }
583 
584                 readBuf.clear();
585                 pipeline.fireChannelReadComplete();
586 
587                 if (exception != null) {
588                     pipeline.fireExceptionCaught(exception);
589                 }
590             } finally {
591                 // Check if there is a readPending which was not processed yet.
592                 // This could be for two reasons:
593                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
594                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
595                 //
596                 // See https://github.com/netty/netty/issues/2254
597                 if (!readPending && !config.isAutoRead()) {
598                     clearEpollIn();
599                 }
600             }
601         }
602     }
603 
604     /**
605      * Act as special {@link InetSocketAddress} to be able to easily pass all needed data from JNI without the need
606      * to create more objects then needed.
607      */
608     static final class DatagramSocketAddress extends InetSocketAddress {
609 
610         private static final long serialVersionUID = 1348596211215015739L;
611 
612         // holds the amount of received bytes
613         final int receivedAmount;
614 
615         DatagramSocketAddress(String addr, int port, int receivedAmount) {
616             super(addr, port);
617             this.receivedAmount = receivedAmount;
618         }
619     }
620 }