View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.AddressedEnvelope;
22  import io.netty.channel.ChannelException;
23  import io.netty.channel.ChannelFuture;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultAddressedEnvelope;
29  import io.netty.channel.socket.DatagramChannel;
30  import io.netty.channel.socket.DatagramPacket;
31  import io.netty.channel.socket.InternetProtocolFamily;
32  import io.netty.channel.socket.SocketProtocolFamily;
33  import io.netty.channel.unix.Errors;
34  import io.netty.channel.unix.Errors.NativeIoException;
35  import io.netty.channel.unix.UnixChannelUtil;
36  import io.netty.util.ReferenceCountUtil;
37  import io.netty.util.UncheckedBooleanSupplier;
38  import io.netty.util.internal.ObjectUtil;
39  import io.netty.util.internal.RecyclableArrayList;
40  import io.netty.util.internal.StringUtil;
41  import io.netty.util.internal.SystemPropertyUtil;
42  import io.netty.util.internal.logging.InternalLogger;
43  import io.netty.util.internal.logging.InternalLoggerFactory;
44  
45  import java.io.IOException;
46  import java.net.Inet4Address;
47  import java.net.InetAddress;
48  import java.net.InetSocketAddress;
49  import java.net.NetworkInterface;
50  import java.net.PortUnreachableException;
51  import java.net.SocketAddress;
52  import java.nio.ByteBuffer;
53  import java.nio.channels.UnresolvedAddressException;
54  
55  import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
56  
57  /**
58   * {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
59   * maximal performance.
60   */
61  public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
62      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollDatagramChannel.class);
63      private static final boolean IP_MULTICAST_ALL =
64              SystemPropertyUtil.getBoolean("io.netty.channel.epoll.ipMulticastAll", false);
65      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
66      private static final String EXPECTED_TYPES =
67              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
68              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
69              StringUtil.simpleClassName(ByteBuf.class) + ", " +
70              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
71              StringUtil.simpleClassName(ByteBuf.class) + ')';
72  
73      private final EpollDatagramChannelConfig config;
74      private volatile boolean connected;
75  
76      static {
77          if (logger.isDebugEnabled()) {
78              logger.debug("-Dio.netty.channel.epoll.ipMulticastAll: {}", IP_MULTICAST_ALL);
79          }
80      }
81  
82      /**
83       * Returns {@code true} if {@link io.netty.channel.unix.SegmentedDatagramPacket} is supported natively.
84       *
85       * @return {@code true} if supported, {@code false} otherwise.
86       */
87      public static boolean isSegmentedDatagramPacketSupported() {
88          return Epoll.isAvailable() &&
89                  // We only support it together with sendmmsg(...)
90                  Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
91      }
92  
93      /**
94       * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
95       * on the Operation Systems default which will be chosen.
96       */
97      public EpollDatagramChannel() {
98          this((SocketProtocolFamily) null);
99      }
100 
101     /**
102      * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
103      * on the Operation Systems default which will be chosen.
104      *
105      * @deprecated use {@link EpollDatagramChannel#EpollDatagramChannel(SocketProtocolFamily)}
106      */
107     @Deprecated
108     public EpollDatagramChannel(InternetProtocolFamily family) {
109         this(newSocketDgram(family), false);
110     }
111 
112     /**
113      * Create a new instance using the given {@link SocketProtocolFamily}. If {@code null} is used it will depend
114      * on the Operation Systems default which will be chosen.
115      */
116     public EpollDatagramChannel(SocketProtocolFamily family) {
117         this(newSocketDgram(family), false);
118     }
119 
120     /**
121      * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
122      * on the Operation Systems default which will be chosen.
123      */
124     public EpollDatagramChannel(int fd) {
125         this(new LinuxSocket(fd), true);
126     }
127 
128     private EpollDatagramChannel(LinuxSocket fd, boolean active) {
129         super(null, fd, active, EpollIoOps.valueOf(0));
130 
131         // Configure IP_MULTICAST_ALL - disable by default to match the behaviour of NIO.
132         try {
133             fd.setIpMulticastAll(IP_MULTICAST_ALL);
134         } catch (IOException | ChannelException e) {
135             logger.debug("Failed to set IP_MULTICAST_ALL to {}", IP_MULTICAST_ALL, e);
136         }
137 
138         config = new EpollDatagramChannelConfig(this);
139     }
140 
141     @Override
142     public InetSocketAddress remoteAddress() {
143         return (InetSocketAddress) super.remoteAddress();
144     }
145 
146     @Override
147     public InetSocketAddress localAddress() {
148         return (InetSocketAddress) super.localAddress();
149     }
150 
151     @Override
152     public ChannelMetadata metadata() {
153         return METADATA;
154     }
155 
156     @Override
157     public boolean isActive() {
158         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
159     }
160 
161     @Override
162     public boolean isConnected() {
163         return connected;
164     }
165 
166     @Override
167     public ChannelFuture joinGroup(InetAddress multicastAddress) {
168         return joinGroup(multicastAddress, newPromise());
169     }
170 
171     @Override
172     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
173         try {
174             NetworkInterface iface = config().getNetworkInterface();
175             if (iface == null) {
176                 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
177             }
178             return joinGroup(multicastAddress, iface, null, promise);
179         } catch (IOException e) {
180             promise.setFailure(e);
181         }
182         return promise;
183     }
184 
185     @Override
186     public ChannelFuture joinGroup(
187             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
188         return joinGroup(multicastAddress, networkInterface, newPromise());
189     }
190 
191     @Override
192     public ChannelFuture joinGroup(
193             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
194             ChannelPromise promise) {
195         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
196     }
197 
198     @Override
199     public ChannelFuture joinGroup(
200             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
201         return joinGroup(multicastAddress, networkInterface, source, newPromise());
202     }
203 
204     @Override
205     public ChannelFuture joinGroup(
206             final InetAddress multicastAddress, final NetworkInterface networkInterface,
207             final InetAddress source, final ChannelPromise promise) {
208 
209         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
210         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
211 
212         if (eventLoop().inEventLoop()) {
213             joinGroup0(multicastAddress, networkInterface, source, promise);
214         } else {
215             eventLoop().execute(new Runnable() {
216                 @Override
217                 public void run() {
218                     joinGroup0(multicastAddress, networkInterface, source, promise);
219                 }
220             });
221         }
222         return promise;
223     }
224 
225     private void joinGroup0(
226             final InetAddress multicastAddress, final NetworkInterface networkInterface,
227             final InetAddress source, final ChannelPromise promise) {
228         assert eventLoop().inEventLoop();
229 
230         try {
231             socket.joinGroup(multicastAddress, networkInterface, source);
232             promise.setSuccess();
233         } catch (IOException e) {
234             promise.setFailure(e);
235         }
236     }
237 
238     @Override
239     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
240         return leaveGroup(multicastAddress, newPromise());
241     }
242 
243     @Override
244     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
245         try {
246             return leaveGroup(
247                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
248         } catch (IOException e) {
249             promise.setFailure(e);
250         }
251         return promise;
252     }
253 
254     @Override
255     public ChannelFuture leaveGroup(
256             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
257         return leaveGroup(multicastAddress, networkInterface, newPromise());
258     }
259 
260     @Override
261     public ChannelFuture leaveGroup(
262             InetSocketAddress multicastAddress,
263             NetworkInterface networkInterface, ChannelPromise promise) {
264         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
265     }
266 
267     @Override
268     public ChannelFuture leaveGroup(
269             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
270         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
271     }
272 
273     @Override
274     public ChannelFuture leaveGroup(
275             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
276             final ChannelPromise promise) {
277         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
278         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
279 
280         if (eventLoop().inEventLoop()) {
281             leaveGroup0(multicastAddress, networkInterface, source, promise);
282         } else {
283             eventLoop().execute(new Runnable() {
284                 @Override
285                 public void run() {
286                     leaveGroup0(multicastAddress, networkInterface, source, promise);
287                 }
288             });
289         }
290         return promise;
291     }
292 
293     private void leaveGroup0(
294             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
295             final ChannelPromise promise) {
296         assert eventLoop().inEventLoop();
297 
298         try {
299             socket.leaveGroup(multicastAddress, networkInterface, source);
300             promise.setSuccess();
301         } catch (IOException e) {
302             promise.setFailure(e);
303         }
304     }
305 
306     @Override
307     public ChannelFuture block(
308             InetAddress multicastAddress, NetworkInterface networkInterface,
309             InetAddress sourceToBlock) {
310         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
311     }
312 
313     @Override
314     public ChannelFuture block(
315             final InetAddress multicastAddress, final NetworkInterface networkInterface,
316             final InetAddress sourceToBlock, final ChannelPromise promise) {
317         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
318         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
319         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
320 
321         promise.setFailure(new UnsupportedOperationException("Multicast block not supported"));
322         return promise;
323     }
324 
325     @Override
326     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
327         return block(multicastAddress, sourceToBlock, newPromise());
328     }
329 
330     @Override
331     public ChannelFuture block(
332             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
333         try {
334             return block(
335                     multicastAddress,
336                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
337                     sourceToBlock, promise);
338         } catch (Throwable e) {
339             promise.setFailure(e);
340         }
341         return promise;
342     }
343 
344     @Override
345     protected AbstractEpollUnsafe newUnsafe() {
346         return new EpollDatagramChannelUnsafe();
347     }
348 
349     @Override
350     protected void doBind(SocketAddress localAddress) throws Exception {
351         if (localAddress instanceof InetSocketAddress) {
352             InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
353             if (socketAddress.getAddress().isAnyLocalAddress() &&
354                     socketAddress.getAddress() instanceof Inet4Address) {
355                 if (socket.family() == SocketProtocolFamily.INET6) {
356                     localAddress = new InetSocketAddress(Native.INET6_ANY, socketAddress.getPort());
357                 }
358             }
359         }
360         super.doBind(localAddress);
361         active = true;
362     }
363 
364     @Override
365     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
366         int maxMessagesPerWrite = maxMessagesPerWrite();
367         while (maxMessagesPerWrite > 0) {
368             Object msg = in.current();
369             if (msg == null) {
370                 // Wrote all messages.
371                 break;
372             }
373 
374             try {
375                 // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
376                 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
377                         // We only handle UDP_SEGMENT in sendmmsg.
378                         in.current() instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
379                     NativeDatagramPacketArray array = cleanDatagramPacketArray();
380                     array.add(in, isConnected(), maxMessagesPerWrite);
381                     int cnt = array.count();
382 
383                     if (cnt >= 1) {
384                         // Try to use gathering writes via sendmmsg(...) syscall.
385                         int offset = 0;
386                         NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
387 
388                         int send = socket.sendmmsg(packets, offset, cnt);
389                         if (send == 0) {
390                             // Did not write all messages.
391                             break;
392                         }
393                         for (int i = 0; i < send; i++) {
394                             in.remove();
395                         }
396                         maxMessagesPerWrite -= send;
397                         continue;
398                     }
399                 }
400                 boolean done = false;
401                 for (int i = config().getWriteSpinCount(); i > 0; --i) {
402                     if (doWriteMessage(msg)) {
403                         done = true;
404                         break;
405                     }
406                 }
407 
408                 if (done) {
409                     in.remove();
410                     maxMessagesPerWrite --;
411                 } else {
412                     break;
413                 }
414             } catch (IOException e) {
415                 maxMessagesPerWrite --;
416                 // Continue on write error as a DatagramChannel can write to multiple remote peers
417                 //
418                 // See https://github.com/netty/netty/issues/2665
419                 in.remove(e);
420             }
421         }
422 
423         if (in.isEmpty()) {
424             // Did write all messages.
425             clearFlag(Native.EPOLLOUT);
426         } else {
427             // Did not write all messages.
428             setFlag(Native.EPOLLOUT);
429         }
430     }
431 
432     private boolean doWriteMessage(Object msg) throws Exception {
433         final ByteBuf data;
434         final InetSocketAddress remoteAddress;
435         if (msg instanceof AddressedEnvelope) {
436             @SuppressWarnings("unchecked")
437             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
438                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
439             data = envelope.content();
440             remoteAddress = envelope.recipient();
441         } else {
442             data = (ByteBuf) msg;
443             remoteAddress = null;
444         }
445 
446         final int dataLen = data.readableBytes();
447         if (dataLen == 0) {
448             return true;
449         }
450 
451         try {
452             return doWriteOrSendBytes(data, remoteAddress, false) > 0;
453         } catch (NativeIoException e) {
454             if (remoteAddress == null) {
455                 throw translateForConnected(e);
456             }
457             throw e;
458         }
459     }
460 
461     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
462         if (envelope.recipient() instanceof InetSocketAddress
463                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
464             throw new UnresolvedAddressException();
465         }
466     }
467 
468     @Override
469     protected Object filterOutboundMessage(Object msg) {
470         if (msg instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
471             if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
472                 throw new UnsupportedOperationException(
473                         "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
474             }
475             io.netty.channel.unix.SegmentedDatagramPacket packet = (io.netty.channel.unix.SegmentedDatagramPacket) msg;
476             checkUnresolved(packet);
477 
478             ByteBuf content = packet.content();
479             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
480                     packet.replace(newDirectBuffer(packet, content)) : msg;
481         }
482         if (msg instanceof DatagramPacket) {
483             DatagramPacket packet = (DatagramPacket) msg;
484             checkUnresolved(packet);
485 
486             ByteBuf content = packet.content();
487             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
488                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
489         }
490 
491         if (msg instanceof ByteBuf) {
492             ByteBuf buf = (ByteBuf) msg;
493             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
494         }
495 
496         if (msg instanceof AddressedEnvelope) {
497             @SuppressWarnings("unchecked")
498             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
499             checkUnresolved(e);
500 
501             if (e.content() instanceof ByteBuf &&
502                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
503 
504                 ByteBuf content = (ByteBuf) e.content();
505                 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
506                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
507                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
508             }
509         }
510 
511         throw new UnsupportedOperationException(
512                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
513     }
514 
515     @Override
516     public EpollDatagramChannelConfig config() {
517         return config;
518     }
519 
520     @Override
521     protected void doDisconnect() throws Exception {
522         socket.disconnect();
523         connected = active = false;
524         resetCachedAddresses();
525     }
526 
527     @Override
528     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
529         if (super.doConnect(remoteAddress, localAddress)) {
530             connected = true;
531             return true;
532         }
533         return false;
534     }
535 
536     @Override
537     protected void doClose() throws Exception {
538         super.doClose();
539         connected = false;
540     }
541 
542     final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
543 
544         @Override
545         void epollInReady() {
546             assert eventLoop().inEventLoop();
547             EpollDatagramChannelConfig config = config();
548             if (shouldBreakEpollInReady(config)) {
549                 clearEpollIn0();
550                 return;
551             }
552             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
553             final ChannelPipeline pipeline = pipeline();
554             final ByteBufAllocator allocator = config.getAllocator();
555             allocHandle.reset(config);
556 
557             Throwable exception = null;
558             try {
559                 try {
560                     boolean connected = isConnected();
561                     do {
562                         final boolean read;
563                         int datagramSize = config().getMaxDatagramPayloadSize();
564 
565                         ByteBuf byteBuf = allocHandle.allocate(allocator);
566                         // Only try to use recvmmsg if its really supported by the running system.
567                         int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
568                                 datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize :
569                                 0;
570                         try {
571                             if (numDatagram <= 1) {
572                                 if (!connected || config.isUdpGro()) {
573                                     read = recvmsg(allocHandle, cleanDatagramPacketArray(), byteBuf);
574                                 } else {
575                                     read = connectedRead(allocHandle, byteBuf, datagramSize);
576                                 }
577                             } else {
578                                 // Try to use scattering reads via recvmmsg(...) syscall.
579                                 read = scatteringRead(allocHandle, cleanDatagramPacketArray(),
580                                         byteBuf, datagramSize, numDatagram);
581                             }
582                         } catch (NativeIoException e) {
583                             if (connected) {
584                                 throw translateForConnected(e);
585                             }
586                             throw e;
587                         }
588 
589                         if (read) {
590                             readPending = false;
591                         } else {
592                             break;
593                         }
594                     // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
595                     // as we read anything).
596                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
597                 } catch (Throwable t) {
598                     exception = t;
599                 }
600 
601                 allocHandle.readComplete();
602                 pipeline.fireChannelReadComplete();
603 
604                 if (exception != null) {
605                     pipeline.fireExceptionCaught(exception);
606                 }
607             } finally {
608                 if (shouldStopReading(config)) {
609                     clearEpollIn();
610                 }
611             }
612         }
613     }
614 
615     private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize)
616             throws Exception {
617         try {
618             int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize)
619                     : byteBuf.writableBytes();
620             allocHandle.attemptedBytesRead(writable);
621 
622             int writerIndex = byteBuf.writerIndex();
623             int localReadAmount;
624             if (byteBuf.hasMemoryAddress()) {
625                 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, writerIndex + writable);
626             } else {
627                 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, writable);
628                 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
629             }
630 
631             if (localReadAmount <= 0) {
632                 allocHandle.lastBytesRead(localReadAmount);
633 
634                 // nothing was read, release the buffer.
635                 return false;
636             }
637             byteBuf.writerIndex(writerIndex + localReadAmount);
638 
639             allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
640                     localReadAmount : writable);
641 
642             DatagramPacket packet = new DatagramPacket(byteBuf, localAddress(), remoteAddress());
643             allocHandle.incMessagesRead(1);
644 
645             pipeline().fireChannelRead(packet);
646             byteBuf = null;
647             return true;
648         } finally {
649             if (byteBuf != null) {
650                 byteBuf.release();
651             }
652         }
653     }
654 
655     private IOException translateForConnected(NativeIoException e) {
656         // We need to correctly translate connect errors to match NIO behaviour.
657         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
658             PortUnreachableException error = new PortUnreachableException(e.getMessage());
659             error.initCause(e);
660             return error;
661         }
662         return e;
663     }
664 
665     private static void addDatagramPacketToOut(DatagramPacket packet,
666                                               RecyclableArrayList out) {
667         if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
668             io.netty.channel.unix.SegmentedDatagramPacket segmentedDatagramPacket =
669                     (io.netty.channel.unix.SegmentedDatagramPacket) packet;
670             ByteBuf content = segmentedDatagramPacket.content();
671             InetSocketAddress recipient = segmentedDatagramPacket.recipient();
672             InetSocketAddress sender = segmentedDatagramPacket.sender();
673             int segmentSize = segmentedDatagramPacket.segmentSize();
674             do {
675                 out.add(new DatagramPacket(content.readRetainedSlice(Math.min(content.readableBytes(),
676                         segmentSize)), recipient, sender));
677             } while (content.isReadable());
678 
679             segmentedDatagramPacket.release();
680         } else {
681             out.add(packet);
682         }
683     }
684 
685     private static void releaseAndRecycle(ByteBuf byteBuf, RecyclableArrayList packetList) {
686         if (byteBuf != null) {
687             byteBuf.release();
688         }
689         if (packetList != null) {
690             for (int i = 0; i < packetList.size(); i++) {
691                 ReferenceCountUtil.release(packetList.get(i));
692             }
693             packetList.recycle();
694         }
695     }
696 
697     private static void processPacket(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
698                                       int bytesRead, DatagramPacket packet) {
699         handle.lastBytesRead(Math.max(1, bytesRead)); // Avoid signalling end-of-data for zero-sized datagrams.
700         handle.incMessagesRead(1);
701         pipeline.fireChannelRead(packet);
702     }
703 
704     private static void processPacketList(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
705                                           int bytesRead, RecyclableArrayList packetList) {
706         int messagesRead = packetList.size();
707         handle.lastBytesRead(Math.max(1, bytesRead)); // Avoid signalling end-of-data for zero-sized datagrams.
708         handle.incMessagesRead(messagesRead);
709         for (int i = 0; i < messagesRead; i++) {
710             pipeline.fireChannelRead(packetList.set(i, Unpooled.EMPTY_BUFFER));
711         }
712     }
713 
714     private boolean recvmsg(EpollRecvByteAllocatorHandle allocHandle,
715                             NativeDatagramPacketArray array, ByteBuf byteBuf) throws IOException {
716         RecyclableArrayList datagramPackets = null;
717         try {
718             int writable = byteBuf.writableBytes();
719 
720             boolean added = array.addWritable(byteBuf, byteBuf.writerIndex(), writable);
721             assert added;
722 
723             allocHandle.attemptedBytesRead(writable);
724 
725             NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
726 
727             int bytesReceived = socket.recvmsg(msg);
728             if (!msg.hasSender()) {
729                 allocHandle.lastBytesRead(-1);
730                 return false;
731             }
732             byteBuf.writerIndex(bytesReceived);
733             InetSocketAddress local = localAddress();
734             DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
735             if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
736                 processPacket(pipeline(), allocHandle, bytesReceived, packet);
737             } else {
738                 // Its important that we process all received data out of the NativeDatagramPacketArray
739                 // before we call fireChannelRead(...). This is because the user may call flush()
740                 // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
741                 datagramPackets = RecyclableArrayList.newInstance();
742                 addDatagramPacketToOut(packet, datagramPackets);
743 
744                 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
745                 datagramPackets.recycle();
746                 datagramPackets = null;
747             }
748 
749             return true;
750         } finally {
751             releaseAndRecycle(byteBuf, datagramPackets);
752         }
753     }
754 
755     private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeDatagramPacketArray array,
756             ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException {
757         RecyclableArrayList datagramPackets = null;
758         try {
759             int offset = byteBuf.writerIndex();
760             for (int i = 0; i < numDatagram;  i++, offset += datagramSize) {
761                 if (!array.addWritable(byteBuf, offset, datagramSize)) {
762                     break;
763                 }
764             }
765 
766             allocHandle.attemptedBytesRead(offset - byteBuf.writerIndex());
767 
768             NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
769 
770             int received = socket.recvmmsg(packets, 0, array.count());
771             if (received == 0) {
772                 allocHandle.lastBytesRead(-1);
773                 return false;
774             }
775 
776             InetSocketAddress local = localAddress();
777 
778             // Set the writerIndex too the maximum number of bytes we might have read.
779             int bytesReceived = received * datagramSize;
780             byteBuf.writerIndex(byteBuf.writerIndex() + bytesReceived);
781 
782             if (received == 1) {
783                 // Single packet fast-path
784                 DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
785                 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
786                     processPacket(pipeline(), allocHandle, datagramSize, packet);
787                     return true;
788                 }
789             }
790             // Its important that we process all received data out of the NativeDatagramPacketArray
791             // before we call fireChannelRead(...). This is because the user may call flush()
792             // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
793             datagramPackets = RecyclableArrayList.newInstance();
794             for (int i = 0; i < received; i++) {
795                 DatagramPacket packet = packets[i].newDatagramPacket(byteBuf, local);
796 
797                 // We need to skip the maximum datagram size to ensure we have the readerIndex in the right position
798                 // for the next one.
799                 byteBuf.skipBytes(datagramSize);
800                 addDatagramPacketToOut(packet, datagramPackets);
801             }
802             // Ass we did use readRetainedSlice(...) before we should now release the byteBuf and null it out.
803             byteBuf.release();
804             byteBuf = null;
805 
806             processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
807             datagramPackets.recycle();
808             datagramPackets = null;
809             return true;
810         } finally {
811             releaseAndRecycle(byteBuf, datagramPackets);
812         }
813     }
814 
815     private NativeDatagramPacketArray cleanDatagramPacketArray() {
816         return ((NativeArrays) registration().attachment()).cleanDatagramPacketArray();
817     }
818 }