View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.AddressedEnvelope;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOutboundBuffer;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.channel.DefaultAddressedEnvelope;
28  import io.netty.channel.socket.DatagramChannel;
29  import io.netty.channel.socket.DatagramPacket;
30  import io.netty.channel.socket.InternetProtocolFamily;
31  import io.netty.channel.socket.SocketProtocolFamily;
32  import io.netty.channel.unix.Errors;
33  import io.netty.channel.unix.Errors.NativeIoException;
34  import io.netty.channel.unix.UnixChannelUtil;
35  import io.netty.util.ReferenceCountUtil;
36  import io.netty.util.UncheckedBooleanSupplier;
37  import io.netty.util.internal.ObjectUtil;
38  import io.netty.util.internal.RecyclableArrayList;
39  import io.netty.util.internal.StringUtil;
40  
41  import java.io.IOException;
42  import java.net.Inet4Address;
43  import java.net.InetAddress;
44  import java.net.InetSocketAddress;
45  import java.net.NetworkInterface;
46  import java.net.PortUnreachableException;
47  import java.net.SocketAddress;
48  import java.nio.ByteBuffer;
49  import java.nio.channels.UnresolvedAddressException;
50  
51  import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
52  
53  /**
54   * {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
55   * maximal performance.
56   */
57  public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
58      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
59      private static final String EXPECTED_TYPES =
60              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
61              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
62              StringUtil.simpleClassName(ByteBuf.class) + ", " +
63              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
64              StringUtil.simpleClassName(ByteBuf.class) + ')';
65  
66      private final EpollDatagramChannelConfig config;
67      private volatile boolean connected;
68  
69      /**
70       * Returns {@code true} if {@link io.netty.channel.unix.SegmentedDatagramPacket} is supported natively.
71       *
72       * @return {@code true} if supported, {@code false} otherwise.
73       */
74      public static boolean isSegmentedDatagramPacketSupported() {
75          return Epoll.isAvailable() &&
76                  // We only support it together with sendmmsg(...)
77                  Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
78      }
79  
80      /**
81       * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
82       * on the Operation Systems default which will be chosen.
83       */
84      public EpollDatagramChannel() {
85          this((SocketProtocolFamily) null);
86      }
87  
88      /**
89       * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
90       * on the Operation Systems default which will be chosen.
91       *
92       * @deprecated use {@link EpollDatagramChannel#EpollDatagramChannel(SocketProtocolFamily)}
93       */
94      @Deprecated
95      public EpollDatagramChannel(InternetProtocolFamily family) {
96          this(newSocketDgram(family), false);
97      }
98  
99      /**
100      * Create a new instance using the given {@link SocketProtocolFamily}. If {@code null} is used it will depend
101      * on the Operation Systems default which will be chosen.
102      */
103     public EpollDatagramChannel(SocketProtocolFamily family) {
104         this(newSocketDgram(family), false);
105     }
106 
107     /**
108      * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
109      * on the Operation Systems default which will be chosen.
110      */
111     public EpollDatagramChannel(int fd) {
112         this(new LinuxSocket(fd), true);
113     }
114 
115     private EpollDatagramChannel(LinuxSocket fd, boolean active) {
116         super(null, fd, active, EpollIoOps.valueOf(0));
117         config = new EpollDatagramChannelConfig(this);
118     }
119 
120     @Override
121     public InetSocketAddress remoteAddress() {
122         return (InetSocketAddress) super.remoteAddress();
123     }
124 
125     @Override
126     public InetSocketAddress localAddress() {
127         return (InetSocketAddress) super.localAddress();
128     }
129 
130     @Override
131     public ChannelMetadata metadata() {
132         return METADATA;
133     }
134 
135     @Override
136     public boolean isActive() {
137         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
138     }
139 
140     @Override
141     public boolean isConnected() {
142         return connected;
143     }
144 
145     @Override
146     public ChannelFuture joinGroup(InetAddress multicastAddress) {
147         return joinGroup(multicastAddress, newPromise());
148     }
149 
150     @Override
151     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
152         try {
153             NetworkInterface iface = config().getNetworkInterface();
154             if (iface == null) {
155                 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
156             }
157             return joinGroup(multicastAddress, iface, null, promise);
158         } catch (IOException e) {
159             promise.setFailure(e);
160         }
161         return promise;
162     }
163 
164     @Override
165     public ChannelFuture joinGroup(
166             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
167         return joinGroup(multicastAddress, networkInterface, newPromise());
168     }
169 
170     @Override
171     public ChannelFuture joinGroup(
172             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
173             ChannelPromise promise) {
174         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
175     }
176 
177     @Override
178     public ChannelFuture joinGroup(
179             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
180         return joinGroup(multicastAddress, networkInterface, source, newPromise());
181     }
182 
183     @Override
184     public ChannelFuture joinGroup(
185             final InetAddress multicastAddress, final NetworkInterface networkInterface,
186             final InetAddress source, final ChannelPromise promise) {
187 
188         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
189         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
190 
191         if (eventLoop().inEventLoop()) {
192             joinGroup0(multicastAddress, networkInterface, source, promise);
193         } else {
194             eventLoop().execute(new Runnable() {
195                 @Override
196                 public void run() {
197                     joinGroup0(multicastAddress, networkInterface, source, promise);
198                 }
199             });
200         }
201         return promise;
202     }
203 
204     private void joinGroup0(
205             final InetAddress multicastAddress, final NetworkInterface networkInterface,
206             final InetAddress source, final ChannelPromise promise) {
207         assert eventLoop().inEventLoop();
208 
209         try {
210             socket.joinGroup(multicastAddress, networkInterface, source);
211             promise.setSuccess();
212         } catch (IOException e) {
213             promise.setFailure(e);
214         }
215     }
216 
217     @Override
218     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
219         return leaveGroup(multicastAddress, newPromise());
220     }
221 
222     @Override
223     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
224         try {
225             return leaveGroup(
226                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
227         } catch (IOException e) {
228             promise.setFailure(e);
229         }
230         return promise;
231     }
232 
233     @Override
234     public ChannelFuture leaveGroup(
235             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
236         return leaveGroup(multicastAddress, networkInterface, newPromise());
237     }
238 
239     @Override
240     public ChannelFuture leaveGroup(
241             InetSocketAddress multicastAddress,
242             NetworkInterface networkInterface, ChannelPromise promise) {
243         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
244     }
245 
246     @Override
247     public ChannelFuture leaveGroup(
248             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
249         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
250     }
251 
252     @Override
253     public ChannelFuture leaveGroup(
254             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
255             final ChannelPromise promise) {
256         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
257         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
258 
259         if (eventLoop().inEventLoop()) {
260             leaveGroup0(multicastAddress, networkInterface, source, promise);
261         } else {
262             eventLoop().execute(new Runnable() {
263                 @Override
264                 public void run() {
265                     leaveGroup0(multicastAddress, networkInterface, source, promise);
266                 }
267             });
268         }
269         return promise;
270     }
271 
272     private void leaveGroup0(
273             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
274             final ChannelPromise promise) {
275         assert eventLoop().inEventLoop();
276 
277         try {
278             socket.leaveGroup(multicastAddress, networkInterface, source);
279             promise.setSuccess();
280         } catch (IOException e) {
281             promise.setFailure(e);
282         }
283     }
284 
285     @Override
286     public ChannelFuture block(
287             InetAddress multicastAddress, NetworkInterface networkInterface,
288             InetAddress sourceToBlock) {
289         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
290     }
291 
292     @Override
293     public ChannelFuture block(
294             final InetAddress multicastAddress, final NetworkInterface networkInterface,
295             final InetAddress sourceToBlock, final ChannelPromise promise) {
296         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
297         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
298         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
299 
300         promise.setFailure(new UnsupportedOperationException("Multicast block not supported"));
301         return promise;
302     }
303 
304     @Override
305     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
306         return block(multicastAddress, sourceToBlock, newPromise());
307     }
308 
309     @Override
310     public ChannelFuture block(
311             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
312         try {
313             return block(
314                     multicastAddress,
315                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
316                     sourceToBlock, promise);
317         } catch (Throwable e) {
318             promise.setFailure(e);
319         }
320         return promise;
321     }
322 
323     @Override
324     protected AbstractEpollUnsafe newUnsafe() {
325         return new EpollDatagramChannelUnsafe();
326     }
327 
328     @Override
329     protected void doBind(SocketAddress localAddress) throws Exception {
330         if (localAddress instanceof InetSocketAddress) {
331             InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
332             if (socketAddress.getAddress().isAnyLocalAddress() &&
333                     socketAddress.getAddress() instanceof Inet4Address) {
334                 if (socket.family() == SocketProtocolFamily.INET6) {
335                     localAddress = new InetSocketAddress(Native.INET6_ANY, socketAddress.getPort());
336                 }
337             }
338         }
339         super.doBind(localAddress);
340         active = true;
341     }
342 
343     @Override
344     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
345         int maxMessagesPerWrite = maxMessagesPerWrite();
346         while (maxMessagesPerWrite > 0) {
347             Object msg = in.current();
348             if (msg == null) {
349                 // Wrote all messages.
350                 break;
351             }
352 
353             try {
354                 // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
355                 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
356                         // We only handle UDP_SEGMENT in sendmmsg.
357                         in.current() instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
358                     NativeDatagramPacketArray array = cleanDatagramPacketArray();
359                     array.add(in, isConnected(), maxMessagesPerWrite);
360                     int cnt = array.count();
361 
362                     if (cnt >= 1) {
363                         // Try to use gathering writes via sendmmsg(...) syscall.
364                         int offset = 0;
365                         NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
366 
367                         int send = socket.sendmmsg(packets, offset, cnt);
368                         if (send == 0) {
369                             // Did not write all messages.
370                             break;
371                         }
372                         for (int i = 0; i < send; i++) {
373                             in.remove();
374                         }
375                         maxMessagesPerWrite -= send;
376                         continue;
377                     }
378                 }
379                 boolean done = false;
380                 for (int i = config().getWriteSpinCount(); i > 0; --i) {
381                     if (doWriteMessage(msg)) {
382                         done = true;
383                         break;
384                     }
385                 }
386 
387                 if (done) {
388                     in.remove();
389                     maxMessagesPerWrite --;
390                 } else {
391                     break;
392                 }
393             } catch (IOException e) {
394                 maxMessagesPerWrite --;
395                 // Continue on write error as a DatagramChannel can write to multiple remote peers
396                 //
397                 // See https://github.com/netty/netty/issues/2665
398                 in.remove(e);
399             }
400         }
401 
402         if (in.isEmpty()) {
403             // Did write all messages.
404             clearFlag(Native.EPOLLOUT);
405         } else {
406             // Did not write all messages.
407             setFlag(Native.EPOLLOUT);
408         }
409     }
410 
411     private boolean doWriteMessage(Object msg) throws Exception {
412         final ByteBuf data;
413         final InetSocketAddress remoteAddress;
414         if (msg instanceof AddressedEnvelope) {
415             @SuppressWarnings("unchecked")
416             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
417                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
418             data = envelope.content();
419             remoteAddress = envelope.recipient();
420         } else {
421             data = (ByteBuf) msg;
422             remoteAddress = null;
423         }
424 
425         final int dataLen = data.readableBytes();
426         if (dataLen == 0) {
427             return true;
428         }
429 
430         return doWriteOrSendBytes(data, remoteAddress, false) > 0;
431     }
432 
433     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
434         if (envelope.recipient() instanceof InetSocketAddress
435                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
436             throw new UnresolvedAddressException();
437         }
438     }
439 
440     @Override
441     protected Object filterOutboundMessage(Object msg) {
442         if (msg instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
443             if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
444                 throw new UnsupportedOperationException(
445                         "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
446             }
447             io.netty.channel.unix.SegmentedDatagramPacket packet = (io.netty.channel.unix.SegmentedDatagramPacket) msg;
448             checkUnresolved(packet);
449 
450             ByteBuf content = packet.content();
451             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
452                     packet.replace(newDirectBuffer(packet, content)) : msg;
453         }
454         if (msg instanceof DatagramPacket) {
455             DatagramPacket packet = (DatagramPacket) msg;
456             checkUnresolved(packet);
457 
458             ByteBuf content = packet.content();
459             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
460                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
461         }
462 
463         if (msg instanceof ByteBuf) {
464             ByteBuf buf = (ByteBuf) msg;
465             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
466         }
467 
468         if (msg instanceof AddressedEnvelope) {
469             @SuppressWarnings("unchecked")
470             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
471             checkUnresolved(e);
472 
473             if (e.content() instanceof ByteBuf &&
474                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
475 
476                 ByteBuf content = (ByteBuf) e.content();
477                 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
478                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
479                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
480             }
481         }
482 
483         throw new UnsupportedOperationException(
484                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
485     }
486 
487     @Override
488     public EpollDatagramChannelConfig config() {
489         return config;
490     }
491 
492     @Override
493     protected void doDisconnect() throws Exception {
494         socket.disconnect();
495         connected = active = false;
496         resetCachedAddresses();
497     }
498 
499     @Override
500     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
501         if (super.doConnect(remoteAddress, localAddress)) {
502             connected = true;
503             return true;
504         }
505         return false;
506     }
507 
508     @Override
509     protected void doClose() throws Exception {
510         super.doClose();
511         connected = false;
512     }
513 
514     final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
515 
516         @Override
517         void epollInReady() {
518             assert eventLoop().inEventLoop();
519             EpollDatagramChannelConfig config = config();
520             if (shouldBreakEpollInReady(config)) {
521                 clearEpollIn0();
522                 return;
523             }
524             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
525             final ChannelPipeline pipeline = pipeline();
526             final ByteBufAllocator allocator = config.getAllocator();
527             allocHandle.reset(config);
528 
529             Throwable exception = null;
530             try {
531                 try {
532                     boolean connected = isConnected();
533                     do {
534                         final boolean read;
535                         int datagramSize = config().getMaxDatagramPayloadSize();
536 
537                         ByteBuf byteBuf = allocHandle.allocate(allocator);
538                         // Only try to use recvmmsg if its really supported by the running system.
539                         int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
540                                 datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize :
541                                 0;
542                         try {
543                             if (numDatagram <= 1) {
544                                 if (!connected || config.isUdpGro()) {
545                                     read = recvmsg(allocHandle, cleanDatagramPacketArray(), byteBuf);
546                                 } else {
547                                     read = connectedRead(allocHandle, byteBuf, datagramSize);
548                                 }
549                             } else {
550                                 // Try to use scattering reads via recvmmsg(...) syscall.
551                                 read = scatteringRead(allocHandle, cleanDatagramPacketArray(),
552                                         byteBuf, datagramSize, numDatagram);
553                             }
554                         } catch (NativeIoException e) {
555                             if (connected) {
556                                 throw translateForConnected(e);
557                             }
558                             throw e;
559                         }
560 
561                         if (read) {
562                             readPending = false;
563                         } else {
564                             break;
565                         }
566                     // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
567                     // as we read anything).
568                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
569                 } catch (Throwable t) {
570                     exception = t;
571                 }
572 
573                 allocHandle.readComplete();
574                 pipeline.fireChannelReadComplete();
575 
576                 if (exception != null) {
577                     pipeline.fireExceptionCaught(exception);
578                 }
579             } finally {
580                 if (shouldStopReading(config)) {
581                     clearEpollIn();
582                 }
583             }
584         }
585     }
586 
587     private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize)
588             throws Exception {
589         try {
590             int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize)
591                     : byteBuf.writableBytes();
592             allocHandle.attemptedBytesRead(writable);
593 
594             int writerIndex = byteBuf.writerIndex();
595             int localReadAmount;
596             if (byteBuf.hasMemoryAddress()) {
597                 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, writerIndex + writable);
598             } else {
599                 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, writable);
600                 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
601             }
602 
603             if (localReadAmount <= 0) {
604                 allocHandle.lastBytesRead(localReadAmount);
605 
606                 // nothing was read, release the buffer.
607                 return false;
608             }
609             byteBuf.writerIndex(writerIndex + localReadAmount);
610 
611             allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
612                     localReadAmount : writable);
613 
614             DatagramPacket packet = new DatagramPacket(byteBuf, localAddress(), remoteAddress());
615             allocHandle.incMessagesRead(1);
616 
617             pipeline().fireChannelRead(packet);
618             byteBuf = null;
619             return true;
620         } finally {
621             if (byteBuf != null) {
622                 byteBuf.release();
623             }
624         }
625     }
626 
627     private IOException translateForConnected(NativeIoException e) {
628         // We need to correctly translate connect errors to match NIO behaviour.
629         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
630             PortUnreachableException error = new PortUnreachableException(e.getMessage());
631             error.initCause(e);
632             return error;
633         }
634         return e;
635     }
636 
637     private static void addDatagramPacketToOut(DatagramPacket packet,
638                                               RecyclableArrayList out) {
639         if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
640             io.netty.channel.unix.SegmentedDatagramPacket segmentedDatagramPacket =
641                     (io.netty.channel.unix.SegmentedDatagramPacket) packet;
642             ByteBuf content = segmentedDatagramPacket.content();
643             InetSocketAddress recipient = segmentedDatagramPacket.recipient();
644             InetSocketAddress sender = segmentedDatagramPacket.sender();
645             int segmentSize = segmentedDatagramPacket.segmentSize();
646             do {
647                 out.add(new DatagramPacket(content.readRetainedSlice(Math.min(content.readableBytes(),
648                         segmentSize)), recipient, sender));
649             } while (content.isReadable());
650 
651             segmentedDatagramPacket.release();
652         } else {
653             out.add(packet);
654         }
655     }
656 
657     private static void releaseAndRecycle(ByteBuf byteBuf, RecyclableArrayList packetList) {
658         if (byteBuf != null) {
659             byteBuf.release();
660         }
661         if (packetList != null) {
662             for (int i = 0; i < packetList.size(); i++) {
663                 ReferenceCountUtil.release(packetList.get(i));
664             }
665             packetList.recycle();
666         }
667     }
668 
669     private static void processPacket(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
670                                       int bytesRead, DatagramPacket packet) {
671         handle.lastBytesRead(Math.max(1, bytesRead)); // Avoid signalling end-of-data for zero-sized datagrams.
672         handle.incMessagesRead(1);
673         pipeline.fireChannelRead(packet);
674     }
675 
676     private static void processPacketList(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
677                                           int bytesRead, RecyclableArrayList packetList) {
678         int messagesRead = packetList.size();
679         handle.lastBytesRead(Math.max(1, bytesRead)); // Avoid signalling end-of-data for zero-sized datagrams.
680         handle.incMessagesRead(messagesRead);
681         for (int i = 0; i < messagesRead; i++) {
682             pipeline.fireChannelRead(packetList.set(i, Unpooled.EMPTY_BUFFER));
683         }
684     }
685 
686     private boolean recvmsg(EpollRecvByteAllocatorHandle allocHandle,
687                             NativeDatagramPacketArray array, ByteBuf byteBuf) throws IOException {
688         RecyclableArrayList datagramPackets = null;
689         try {
690             int writable = byteBuf.writableBytes();
691 
692             boolean added = array.addWritable(byteBuf, byteBuf.writerIndex(), writable);
693             assert added;
694 
695             allocHandle.attemptedBytesRead(writable);
696 
697             NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
698 
699             int bytesReceived = socket.recvmsg(msg);
700             if (!msg.hasSender()) {
701                 allocHandle.lastBytesRead(-1);
702                 return false;
703             }
704             byteBuf.writerIndex(bytesReceived);
705             InetSocketAddress local = localAddress();
706             DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
707             if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
708                 processPacket(pipeline(), allocHandle, bytesReceived, packet);
709             } else {
710                 // Its important that we process all received data out of the NativeDatagramPacketArray
711                 // before we call fireChannelRead(...). This is because the user may call flush()
712                 // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
713                 datagramPackets = RecyclableArrayList.newInstance();
714                 addDatagramPacketToOut(packet, datagramPackets);
715 
716                 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
717                 datagramPackets.recycle();
718                 datagramPackets = null;
719             }
720 
721             return true;
722         } finally {
723             releaseAndRecycle(byteBuf, datagramPackets);
724         }
725     }
726 
727     private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeDatagramPacketArray array,
728             ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException {
729         RecyclableArrayList datagramPackets = null;
730         try {
731             int offset = byteBuf.writerIndex();
732             for (int i = 0; i < numDatagram;  i++, offset += datagramSize) {
733                 if (!array.addWritable(byteBuf, offset, datagramSize)) {
734                     break;
735                 }
736             }
737 
738             allocHandle.attemptedBytesRead(offset - byteBuf.writerIndex());
739 
740             NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
741 
742             int received = socket.recvmmsg(packets, 0, array.count());
743             if (received == 0) {
744                 allocHandle.lastBytesRead(-1);
745                 return false;
746             }
747 
748             InetSocketAddress local = localAddress();
749 
750             // Set the writerIndex too the maximum number of bytes we might have read.
751             int bytesReceived = received * datagramSize;
752             byteBuf.writerIndex(byteBuf.writerIndex() + bytesReceived);
753 
754             if (received == 1) {
755                 // Single packet fast-path
756                 DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
757                 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
758                     processPacket(pipeline(), allocHandle, datagramSize, packet);
759                     return true;
760                 }
761             }
762             // Its important that we process all received data out of the NativeDatagramPacketArray
763             // before we call fireChannelRead(...). This is because the user may call flush()
764             // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
765             datagramPackets = RecyclableArrayList.newInstance();
766             for (int i = 0; i < received; i++) {
767                 DatagramPacket packet = packets[i].newDatagramPacket(byteBuf, local);
768 
769                 // We need to skip the maximum datagram size to ensure we have the readerIndex in the right position
770                 // for the next one.
771                 byteBuf.skipBytes(datagramSize);
772                 addDatagramPacketToOut(packet, datagramPackets);
773             }
774             // Ass we did use readRetainedSlice(...) before we should now release the byteBuf and null it out.
775             byteBuf.release();
776             byteBuf = null;
777 
778             processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
779             datagramPackets.recycle();
780             datagramPackets = null;
781             return true;
782         } finally {
783             releaseAndRecycle(byteBuf, datagramPackets);
784         }
785     }
786 
787     private NativeDatagramPacketArray cleanDatagramPacketArray() {
788         return registration().ioHandler().cleanDatagramPacketArray();
789     }
790 }