View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.AddressedEnvelope;
22  import io.netty.channel.ChannelException;
23  import io.netty.channel.ChannelFuture;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultAddressedEnvelope;
29  import io.netty.channel.socket.DatagramChannel;
30  import io.netty.channel.socket.DatagramPacket;
31  import io.netty.channel.socket.InternetProtocolFamily;
32  import io.netty.channel.socket.SocketProtocolFamily;
33  import io.netty.channel.unix.Errors;
34  import io.netty.channel.unix.Errors.NativeIoException;
35  import io.netty.channel.unix.UnixChannelUtil;
36  import io.netty.util.ReferenceCountUtil;
37  import io.netty.util.UncheckedBooleanSupplier;
38  import io.netty.util.internal.ObjectUtil;
39  import io.netty.util.internal.RecyclableArrayList;
40  import io.netty.util.internal.StringUtil;
41  import io.netty.util.internal.SystemPropertyUtil;
42  import io.netty.util.internal.logging.InternalLogger;
43  import io.netty.util.internal.logging.InternalLoggerFactory;
44  
45  import java.io.IOException;
46  import java.net.Inet4Address;
47  import java.net.InetAddress;
48  import java.net.InetSocketAddress;
49  import java.net.NetworkInterface;
50  import java.net.PortUnreachableException;
51  import java.net.SocketAddress;
52  import java.nio.ByteBuffer;
53  import java.nio.channels.UnresolvedAddressException;
54  
55  import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
56  
57  /**
58   * {@link DatagramChannel} implementation that uses linux EPOLL.
59   */
60  public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
61      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollDatagramChannel.class);
62      private static final boolean IP_MULTICAST_ALL =
63              SystemPropertyUtil.getBoolean("io.netty.channel.epoll.ipMulticastAll", false);
64      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
65      private static final String EXPECTED_TYPES =
66              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
67              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
68              StringUtil.simpleClassName(ByteBuf.class) + ", " +
69              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
70              StringUtil.simpleClassName(ByteBuf.class) + ')';
71  
72      private final EpollDatagramChannelConfig config;
73      private volatile boolean connected;
74  
75      static {
76          if (logger.isDebugEnabled()) {
77              logger.debug("-Dio.netty.channel.epoll.ipMulticastAll: {}", IP_MULTICAST_ALL);
78          }
79      }
80  
81      /**
82       * Returns {@code true} if {@link io.netty.channel.unix.SegmentedDatagramPacket} is supported natively.
83       *
84       * @return {@code true} if supported, {@code false} otherwise.
85       */
86      public static boolean isSegmentedDatagramPacketSupported() {
87          return Epoll.isAvailable() &&
88                  // We only support it together with sendmmsg(...)
89                  Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
90      }
91  
92      /**
93       * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
94       * on the Operation Systems default which will be chosen.
95       */
96      public EpollDatagramChannel() {
97          this((SocketProtocolFamily) null);
98      }
99  
100     /**
101      * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
102      * on the Operation Systems default which will be chosen.
103      *
104      * @deprecated use {@link EpollDatagramChannel#EpollDatagramChannel(SocketProtocolFamily)}
105      */
106     @Deprecated
107     public EpollDatagramChannel(InternetProtocolFamily family) {
108         this(newSocketDgram(family), false);
109     }
110 
111     /**
112      * Create a new instance using the given {@link SocketProtocolFamily}. If {@code null} is used it will depend
113      * on the Operation Systems default which will be chosen.
114      */
115     public EpollDatagramChannel(SocketProtocolFamily family) {
116         this(newSocketDgram(family), false);
117     }
118 
119     /**
120      * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
121      * on the Operation Systems default which will be chosen.
122      */
123     public EpollDatagramChannel(int fd) {
124         this(new LinuxSocket(fd), true);
125     }
126 
127     private EpollDatagramChannel(LinuxSocket fd, boolean active) {
128         super(null, fd, active, EpollIoOps.valueOf(0));
129 
130         // Configure IP_MULTICAST_ALL - disable by default to match the behaviour of NIO.
131         try {
132             fd.setIpMulticastAll(IP_MULTICAST_ALL);
133         } catch (IOException | ChannelException e) {
134             logger.debug("Failed to set IP_MULTICAST_ALL to {}", IP_MULTICAST_ALL, e);
135         }
136 
137         config = new EpollDatagramChannelConfig(this);
138     }
139 
140     @Override
141     public InetSocketAddress remoteAddress() {
142         return (InetSocketAddress) super.remoteAddress();
143     }
144 
145     @Override
146     public InetSocketAddress localAddress() {
147         return (InetSocketAddress) super.localAddress();
148     }
149 
150     @Override
151     public ChannelMetadata metadata() {
152         return METADATA;
153     }
154 
155     @Override
156     public boolean isActive() {
157         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
158     }
159 
160     @Override
161     public boolean isConnected() {
162         return connected;
163     }
164 
165     @Override
166     public ChannelFuture joinGroup(InetAddress multicastAddress) {
167         return joinGroup(multicastAddress, newPromise());
168     }
169 
170     @Override
171     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
172         try {
173             NetworkInterface iface = config().getNetworkInterface();
174             if (iface == null) {
175                 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
176             }
177             return joinGroup(multicastAddress, iface, null, promise);
178         } catch (IOException e) {
179             promise.setFailure(e);
180         }
181         return promise;
182     }
183 
184     @Override
185     public ChannelFuture joinGroup(
186             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
187         return joinGroup(multicastAddress, networkInterface, newPromise());
188     }
189 
190     @Override
191     public ChannelFuture joinGroup(
192             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
193             ChannelPromise promise) {
194         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
195     }
196 
197     @Override
198     public ChannelFuture joinGroup(
199             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
200         return joinGroup(multicastAddress, networkInterface, source, newPromise());
201     }
202 
203     @Override
204     public ChannelFuture joinGroup(
205             final InetAddress multicastAddress, final NetworkInterface networkInterface,
206             final InetAddress source, final ChannelPromise promise) {
207 
208         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
209         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
210 
211         if (eventLoop().inEventLoop()) {
212             joinGroup0(multicastAddress, networkInterface, source, promise);
213         } else {
214             eventLoop().execute(new Runnable() {
215                 @Override
216                 public void run() {
217                     joinGroup0(multicastAddress, networkInterface, source, promise);
218                 }
219             });
220         }
221         return promise;
222     }
223 
224     private void joinGroup0(
225             final InetAddress multicastAddress, final NetworkInterface networkInterface,
226             final InetAddress source, final ChannelPromise promise) {
227         assert eventLoop().inEventLoop();
228 
229         try {
230             socket.joinGroup(multicastAddress, networkInterface, source);
231             promise.setSuccess();
232         } catch (IOException e) {
233             promise.setFailure(e);
234         }
235     }
236 
237     @Override
238     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
239         return leaveGroup(multicastAddress, newPromise());
240     }
241 
242     @Override
243     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
244         try {
245             return leaveGroup(
246                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
247         } catch (IOException e) {
248             promise.setFailure(e);
249         }
250         return promise;
251     }
252 
253     @Override
254     public ChannelFuture leaveGroup(
255             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
256         return leaveGroup(multicastAddress, networkInterface, newPromise());
257     }
258 
259     @Override
260     public ChannelFuture leaveGroup(
261             InetSocketAddress multicastAddress,
262             NetworkInterface networkInterface, ChannelPromise promise) {
263         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
264     }
265 
266     @Override
267     public ChannelFuture leaveGroup(
268             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
269         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
270     }
271 
272     @Override
273     public ChannelFuture leaveGroup(
274             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
275             final ChannelPromise promise) {
276         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
277         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
278 
279         if (eventLoop().inEventLoop()) {
280             leaveGroup0(multicastAddress, networkInterface, source, promise);
281         } else {
282             eventLoop().execute(new Runnable() {
283                 @Override
284                 public void run() {
285                     leaveGroup0(multicastAddress, networkInterface, source, promise);
286                 }
287             });
288         }
289         return promise;
290     }
291 
292     private void leaveGroup0(
293             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
294             final ChannelPromise promise) {
295         assert eventLoop().inEventLoop();
296 
297         try {
298             socket.leaveGroup(multicastAddress, networkInterface, source);
299             promise.setSuccess();
300         } catch (IOException e) {
301             promise.setFailure(e);
302         }
303     }
304 
305     @Override
306     public ChannelFuture block(
307             InetAddress multicastAddress, NetworkInterface networkInterface,
308             InetAddress sourceToBlock) {
309         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
310     }
311 
312     @Override
313     public ChannelFuture block(
314             final InetAddress multicastAddress, final NetworkInterface networkInterface,
315             final InetAddress sourceToBlock, final ChannelPromise promise) {
316         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
317         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
318         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
319 
320         promise.setFailure(new UnsupportedOperationException("Multicast block not supported"));
321         return promise;
322     }
323 
324     @Override
325     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
326         return block(multicastAddress, sourceToBlock, newPromise());
327     }
328 
329     @Override
330     public ChannelFuture block(
331             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
332         try {
333             return block(
334                     multicastAddress,
335                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
336                     sourceToBlock, promise);
337         } catch (Throwable e) {
338             promise.setFailure(e);
339         }
340         return promise;
341     }
342 
343     @Override
344     protected AbstractEpollUnsafe newUnsafe() {
345         return new EpollDatagramChannelUnsafe();
346     }
347 
348     @Override
349     protected void doRegister(ChannelPromise promise) {
350         super.doRegister(promise);
351         promise.addListener(f -> {
352             if (f.isSuccess() && isRegistered()) {
353                 // As Datagram is connection-less we can submit the current ops once the registration itself was
354                 // successful.
355                 submitCurrentOps();
356             }
357         });
358     }
359 
360     @Override
361     protected void doBind(SocketAddress localAddress) throws Exception {
362         if (localAddress instanceof InetSocketAddress) {
363             InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
364             if (socketAddress.getAddress().isAnyLocalAddress() &&
365                     socketAddress.getAddress() instanceof Inet4Address) {
366                 if (socket.family() == SocketProtocolFamily.INET6) {
367                     localAddress = new InetSocketAddress(Native.INET6_ANY, socketAddress.getPort());
368                 }
369             }
370         }
371         super.doBind(localAddress);
372         active = true;
373     }
374 
375     @Override
376     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
377         int maxMessagesPerWrite = maxMessagesPerWrite();
378         while (maxMessagesPerWrite > 0) {
379             Object msg = in.current();
380             if (msg == null) {
381                 // Wrote all messages.
382                 break;
383             }
384 
385             try {
386                 // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
387                 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
388                         // We only handle UDP_SEGMENT in sendmmsg.
389                         in.current() instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
390                     NativeDatagramPacketArray array = cleanDatagramPacketArray();
391                     array.add(in, isConnected(), maxMessagesPerWrite);
392                     int cnt = array.count();
393 
394                     if (cnt >= 1) {
395                         // Try to use gathering writes via sendmmsg(...) syscall.
396                         int offset = 0;
397                         NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
398 
399                         int send = socket.sendmmsg(packets, offset, cnt);
400                         if (send == 0) {
401                             // Did not write all messages.
402                             break;
403                         }
404                         for (int i = 0; i < send; i++) {
405                             in.remove();
406                         }
407                         maxMessagesPerWrite -= send;
408                         continue;
409                     }
410                 }
411                 boolean done = false;
412                 for (int i = config().getWriteSpinCount(); i > 0; --i) {
413                     if (doWriteMessage(msg)) {
414                         done = true;
415                         break;
416                     }
417                 }
418 
419                 if (done) {
420                     in.remove();
421                     maxMessagesPerWrite --;
422                 } else {
423                     break;
424                 }
425             } catch (IOException e) {
426                 maxMessagesPerWrite --;
427                 // Continue on write error as a DatagramChannel can write to multiple remote peers
428                 //
429                 // See https://github.com/netty/netty/issues/2665
430                 in.remove(e);
431             }
432         }
433 
434         if (in.isEmpty()) {
435             // Did write all messages.
436             clearFlag(Native.EPOLLOUT);
437         } else {
438             // Did not write all messages.
439             setFlag(Native.EPOLLOUT);
440         }
441     }
442 
443     private boolean doWriteMessage(Object msg) throws Exception {
444         final ByteBuf data;
445         final InetSocketAddress remoteAddress;
446         if (msg instanceof AddressedEnvelope) {
447             @SuppressWarnings("unchecked")
448             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
449                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
450             data = envelope.content();
451             remoteAddress = envelope.recipient();
452         } else {
453             data = (ByteBuf) msg;
454             remoteAddress = null;
455         }
456 
457         final int dataLen = data.readableBytes();
458         if (dataLen == 0) {
459             return true;
460         }
461 
462         try {
463             return doWriteOrSendBytes(data, remoteAddress, false) > 0;
464         } catch (NativeIoException e) {
465             if (remoteAddress == null) {
466                 throw translateForConnected(e);
467             }
468             throw e;
469         }
470     }
471 
472     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
473         if (envelope.recipient() instanceof InetSocketAddress
474                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
475             throw new UnresolvedAddressException();
476         }
477     }
478 
479     @Override
480     protected Object filterOutboundMessage(Object msg) {
481         if (msg instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
482             if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
483                 throw new UnsupportedOperationException(
484                         "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
485             }
486             io.netty.channel.unix.SegmentedDatagramPacket packet = (io.netty.channel.unix.SegmentedDatagramPacket) msg;
487             checkUnresolved(packet);
488 
489             ByteBuf content = packet.content();
490             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
491                     packet.replace(newDirectBuffer(packet, content)) : msg;
492         }
493         if (msg instanceof DatagramPacket) {
494             DatagramPacket packet = (DatagramPacket) msg;
495             checkUnresolved(packet);
496 
497             ByteBuf content = packet.content();
498             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
499                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
500         }
501 
502         if (msg instanceof ByteBuf) {
503             ByteBuf buf = (ByteBuf) msg;
504             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
505         }
506 
507         if (msg instanceof AddressedEnvelope) {
508             @SuppressWarnings("unchecked")
509             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
510             checkUnresolved(e);
511 
512             if (e.content() instanceof ByteBuf &&
513                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
514 
515                 ByteBuf content = (ByteBuf) e.content();
516                 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
517                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
518                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
519             }
520         }
521 
522         throw new UnsupportedOperationException(
523                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
524     }
525 
526     @Override
527     public EpollDatagramChannelConfig config() {
528         return config;
529     }
530 
531     @Override
532     protected void doDisconnect() throws Exception {
533         socket.disconnect();
534         connected = active = false;
535         resetCachedAddresses();
536     }
537 
538     @Override
539     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
540         if (super.doConnect(remoteAddress, localAddress)) {
541             connected = true;
542             return true;
543         }
544         return false;
545     }
546 
547     @Override
548     protected void doClose() throws Exception {
549         super.doClose();
550         connected = false;
551     }
552 
553     final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
554 
555         @Override
556         void epollInReady() {
557             assert eventLoop().inEventLoop();
558             EpollDatagramChannelConfig config = config();
559             if (shouldBreakEpollInReady(config)) {
560                 clearEpollIn0();
561                 return;
562             }
563             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
564             final ChannelPipeline pipeline = pipeline();
565             final ByteBufAllocator allocator = config.getAllocator();
566             allocHandle.reset(config);
567 
568             Throwable exception = null;
569             try {
570                 try {
571                     boolean connected = isConnected();
572                     do {
573                         final boolean read;
574                         int datagramSize = config().getMaxDatagramPayloadSize();
575 
576                         ByteBuf byteBuf = allocHandle.allocate(allocator);
577                         // Only try to use recvmmsg if its really supported by the running system.
578                         int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
579                                 datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize :
580                                 0;
581                         try {
582                             if (numDatagram <= 1) {
583                                 if (!connected || config.isUdpGro()) {
584                                     read = recvmsg(allocHandle, cleanDatagramPacketArray(), byteBuf);
585                                 } else {
586                                     read = connectedRead(allocHandle, byteBuf, datagramSize);
587                                 }
588                             } else {
589                                 // Try to use scattering reads via recvmmsg(...) syscall.
590                                 read = scatteringRead(allocHandle, cleanDatagramPacketArray(),
591                                         byteBuf, datagramSize, numDatagram);
592                             }
593                         } catch (NativeIoException e) {
594                             if (connected) {
595                                 throw translateForConnected(e);
596                             }
597                             throw e;
598                         }
599 
600                         if (read) {
601                             readPending = false;
602                         } else {
603                             break;
604                         }
605                     // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
606                     // as we read anything).
607                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
608                 } catch (Throwable t) {
609                     exception = t;
610                 }
611 
612                 allocHandle.readComplete();
613                 pipeline.fireChannelReadComplete();
614 
615                 if (exception != null) {
616                     pipeline.fireExceptionCaught(exception);
617                 }
618             } finally {
619                 if (shouldStopReading(config)) {
620                     clearEpollIn();
621                 }
622             }
623         }
624     }
625 
626     private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize)
627             throws Exception {
628         try {
629             int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize)
630                     : byteBuf.writableBytes();
631             allocHandle.attemptedBytesRead(writable);
632 
633             int writerIndex = byteBuf.writerIndex();
634             int localReadAmount;
635             if (byteBuf.hasMemoryAddress()) {
636                 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, writerIndex + writable);
637             } else {
638                 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, writable);
639                 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
640             }
641 
642             if (localReadAmount <= 0) {
643                 allocHandle.lastBytesRead(localReadAmount);
644 
645                 // nothing was read, release the buffer.
646                 return false;
647             }
648             byteBuf.writerIndex(writerIndex + localReadAmount);
649 
650             allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
651                     localReadAmount : writable);
652 
653             DatagramPacket packet = new DatagramPacket(byteBuf, localAddress(), remoteAddress());
654             allocHandle.incMessagesRead(1);
655 
656             pipeline().fireChannelRead(packet);
657             byteBuf = null;
658             return true;
659         } finally {
660             if (byteBuf != null) {
661                 byteBuf.release();
662             }
663         }
664     }
665 
666     private IOException translateForConnected(NativeIoException e) {
667         // We need to correctly translate connect errors to match NIO behaviour.
668         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
669             PortUnreachableException error = new PortUnreachableException(e.getMessage());
670             error.initCause(e);
671             return error;
672         }
673         return e;
674     }
675 
676     private static void addDatagramPacketToOut(DatagramPacket packet,
677                                               RecyclableArrayList out) {
678         if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
679             io.netty.channel.unix.SegmentedDatagramPacket segmentedDatagramPacket =
680                     (io.netty.channel.unix.SegmentedDatagramPacket) packet;
681             ByteBuf content = segmentedDatagramPacket.content();
682             InetSocketAddress recipient = segmentedDatagramPacket.recipient();
683             InetSocketAddress sender = segmentedDatagramPacket.sender();
684             int segmentSize = segmentedDatagramPacket.segmentSize();
685             do {
686                 out.add(new DatagramPacket(content.readRetainedSlice(Math.min(content.readableBytes(),
687                         segmentSize)), recipient, sender));
688             } while (content.isReadable());
689 
690             segmentedDatagramPacket.release();
691         } else {
692             out.add(packet);
693         }
694     }
695 
696     private static void releaseAndRecycle(ByteBuf byteBuf, RecyclableArrayList packetList) {
697         if (byteBuf != null) {
698             byteBuf.release();
699         }
700         if (packetList != null) {
701             for (int i = 0; i < packetList.size(); i++) {
702                 ReferenceCountUtil.release(packetList.get(i));
703             }
704             packetList.recycle();
705         }
706     }
707 
708     private static void processPacket(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
709                                       int bytesRead, DatagramPacket packet) {
710         handle.lastBytesRead(Math.max(1, bytesRead)); // Avoid signalling end-of-data for zero-sized datagrams.
711         handle.incMessagesRead(1);
712         pipeline.fireChannelRead(packet);
713     }
714 
715     private static void processPacketList(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
716                                           int bytesRead, RecyclableArrayList packetList) {
717         int messagesRead = packetList.size();
718         handle.lastBytesRead(Math.max(1, bytesRead)); // Avoid signalling end-of-data for zero-sized datagrams.
719         handle.incMessagesRead(messagesRead);
720         for (int i = 0; i < messagesRead; i++) {
721             pipeline.fireChannelRead(packetList.set(i, Unpooled.EMPTY_BUFFER));
722         }
723     }
724 
725     private boolean recvmsg(EpollRecvByteAllocatorHandle allocHandle,
726                             NativeDatagramPacketArray array, ByteBuf byteBuf) throws IOException {
727         RecyclableArrayList datagramPackets = null;
728         try {
729             int writable = byteBuf.writableBytes();
730 
731             boolean added = array.addWritable(byteBuf, byteBuf.writerIndex(), writable);
732             assert added;
733 
734             allocHandle.attemptedBytesRead(writable);
735 
736             NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
737 
738             int bytesReceived = socket.recvmsg(msg);
739             if (!msg.hasSender()) {
740                 allocHandle.lastBytesRead(-1);
741                 return false;
742             }
743             byteBuf.writerIndex(bytesReceived);
744             InetSocketAddress local = localAddress();
745             DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
746             if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
747                 processPacket(pipeline(), allocHandle, bytesReceived, packet);
748             } else {
749                 // Its important that we process all received data out of the NativeDatagramPacketArray
750                 // before we call fireChannelRead(...). This is because the user may call flush()
751                 // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
752                 datagramPackets = RecyclableArrayList.newInstance();
753                 addDatagramPacketToOut(packet, datagramPackets);
754 
755                 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
756                 datagramPackets.recycle();
757                 datagramPackets = null;
758             }
759 
760             return true;
761         } finally {
762             releaseAndRecycle(byteBuf, datagramPackets);
763         }
764     }
765 
766     private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeDatagramPacketArray array,
767             ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException {
768         RecyclableArrayList datagramPackets = null;
769         try {
770             int offset = byteBuf.writerIndex();
771             for (int i = 0; i < numDatagram;  i++, offset += datagramSize) {
772                 if (!array.addWritable(byteBuf, offset, datagramSize)) {
773                     break;
774                 }
775             }
776 
777             allocHandle.attemptedBytesRead(offset - byteBuf.writerIndex());
778 
779             NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
780 
781             int received = socket.recvmmsg(packets, 0, array.count());
782             if (received == 0) {
783                 allocHandle.lastBytesRead(-1);
784                 return false;
785             }
786 
787             InetSocketAddress local = localAddress();
788 
789             // Set the writerIndex too the maximum number of bytes we might have read.
790             int bytesReceived = received * datagramSize;
791             byteBuf.writerIndex(byteBuf.writerIndex() + bytesReceived);
792 
793             if (received == 1) {
794                 // Single packet fast-path
795                 DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
796                 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
797                     processPacket(pipeline(), allocHandle, datagramSize, packet);
798                     return true;
799                 }
800             }
801             // Its important that we process all received data out of the NativeDatagramPacketArray
802             // before we call fireChannelRead(...). This is because the user may call flush()
803             // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
804             datagramPackets = RecyclableArrayList.newInstance();
805             for (int i = 0; i < received; i++) {
806                 DatagramPacket packet = packets[i].newDatagramPacket(byteBuf, local);
807 
808                 // We need to skip the maximum datagram size to ensure we have the readerIndex in the right position
809                 // for the next one.
810                 byteBuf.skipBytes(datagramSize);
811                 addDatagramPacketToOut(packet, datagramPackets);
812             }
813             // Ass we did use readRetainedSlice(...) before we should now release the byteBuf and null it out.
814             byteBuf.release();
815             byteBuf = null;
816 
817             processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
818             datagramPackets.recycle();
819             datagramPackets = null;
820             return true;
821         } finally {
822             releaseAndRecycle(byteBuf, datagramPackets);
823         }
824     }
825 
826     private NativeDatagramPacketArray cleanDatagramPacketArray() {
827         return ((NativeArrays) registration().attachment()).cleanDatagramPacketArray();
828     }
829 }