View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.AddressedEnvelope;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOutboundBuffer;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.channel.DefaultAddressedEnvelope;
28  import io.netty.channel.socket.DatagramChannel;
29  import io.netty.channel.socket.DatagramPacket;
30  import io.netty.channel.socket.InternetProtocolFamily;
31  import io.netty.channel.unix.Errors;
32  import io.netty.channel.unix.Errors.NativeIoException;
33  import io.netty.channel.unix.Socket;
34  import io.netty.channel.unix.UnixChannelUtil;
35  import io.netty.util.ReferenceCountUtil;
36  import io.netty.util.UncheckedBooleanSupplier;
37  import io.netty.util.internal.ObjectUtil;
38  import io.netty.util.internal.RecyclableArrayList;
39  import io.netty.util.internal.StringUtil;
40  
41  import java.io.IOException;
42  import java.net.Inet4Address;
43  import java.net.InetAddress;
44  import java.net.InetSocketAddress;
45  import java.net.NetworkInterface;
46  import java.net.PortUnreachableException;
47  import java.net.SocketAddress;
48  import java.nio.ByteBuffer;
49  
50  import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
51  
52  /**
53   * {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
54   * maximal performance.
55   */
56  public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
57      private static final ChannelMetadata METADATA = new ChannelMetadata(true);
58      private static final String EXPECTED_TYPES =
59              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
60              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
61              StringUtil.simpleClassName(ByteBuf.class) + ", " +
62              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
63              StringUtil.simpleClassName(ByteBuf.class) + ')';
64  
65      private final EpollDatagramChannelConfig config;
66      private volatile boolean connected;
67  
68      /**
69       * Returns {@code true} if {@link io.netty.channel.unix.SegmentedDatagramPacket} is supported natively.
70       *
71       * @return {@code true} if supported, {@code false} otherwise.
72       */
73      public static boolean isSegmentedDatagramPacketSupported() {
74          return Epoll.isAvailable() &&
75                  // We only support it together with sendmmsg(...)
76                  Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
77      }
78  
79      /**
80       * Create a new instance which selects the {@link InternetProtocolFamily} to use depending
81       * on the Operation Systems default which will be chosen.
82       */
83      public EpollDatagramChannel() {
84          this(null);
85      }
86  
87      /**
88       * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
89       * on the Operation Systems default which will be chosen.
90       */
91      public EpollDatagramChannel(InternetProtocolFamily family) {
92          this(newSocketDgram(family), false);
93      }
94  
95      /**
96       * Create a new instance which selects the {@link InternetProtocolFamily} to use depending
97       * on the Operation Systems default which will be chosen.
98       */
99      public EpollDatagramChannel(int fd) {
100         this(new LinuxSocket(fd), true);
101     }
102 
103     private EpollDatagramChannel(LinuxSocket fd, boolean active) {
104         super(null, fd, active);
105         config = new EpollDatagramChannelConfig(this);
106     }
107 
108     @Override
109     public InetSocketAddress remoteAddress() {
110         return (InetSocketAddress) super.remoteAddress();
111     }
112 
113     @Override
114     public InetSocketAddress localAddress() {
115         return (InetSocketAddress) super.localAddress();
116     }
117 
118     @Override
119     public ChannelMetadata metadata() {
120         return METADATA;
121     }
122 
123     @Override
124     @SuppressWarnings("deprecation")
125     public boolean isActive() {
126         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
127     }
128 
129     @Override
130     public boolean isConnected() {
131         return connected;
132     }
133 
134     @Override
135     public ChannelFuture joinGroup(InetAddress multicastAddress) {
136         return joinGroup(multicastAddress, newPromise());
137     }
138 
139     @Override
140     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
141         try {
142             NetworkInterface iface = config().getNetworkInterface();
143             if (iface == null) {
144                 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
145             }
146             return joinGroup(multicastAddress, iface, null, promise);
147         } catch (IOException e) {
148             promise.setFailure(e);
149         }
150         return promise;
151     }
152 
153     @Override
154     public ChannelFuture joinGroup(
155             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
156         return joinGroup(multicastAddress, networkInterface, newPromise());
157     }
158 
159     @Override
160     public ChannelFuture joinGroup(
161             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
162             ChannelPromise promise) {
163         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
164     }
165 
166     @Override
167     public ChannelFuture joinGroup(
168             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
169         return joinGroup(multicastAddress, networkInterface, source, newPromise());
170     }
171 
172     @Override
173     public ChannelFuture joinGroup(
174             final InetAddress multicastAddress, final NetworkInterface networkInterface,
175             final InetAddress source, final ChannelPromise promise) {
176 
177         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
178         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
179 
180         try {
181             socket.joinGroup(multicastAddress, networkInterface, source);
182             promise.setSuccess();
183         } catch (IOException e) {
184             promise.setFailure(e);
185         }
186         return promise;
187     }
188 
189     @Override
190     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
191         return leaveGroup(multicastAddress, newPromise());
192     }
193 
194     @Override
195     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
196         try {
197             return leaveGroup(
198                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
199         } catch (IOException e) {
200             promise.setFailure(e);
201         }
202         return promise;
203     }
204 
205     @Override
206     public ChannelFuture leaveGroup(
207             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
208         return leaveGroup(multicastAddress, networkInterface, newPromise());
209     }
210 
211     @Override
212     public ChannelFuture leaveGroup(
213             InetSocketAddress multicastAddress,
214             NetworkInterface networkInterface, ChannelPromise promise) {
215         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
216     }
217 
218     @Override
219     public ChannelFuture leaveGroup(
220             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
221         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
222     }
223 
224     @Override
225     public ChannelFuture leaveGroup(
226             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
227             final ChannelPromise promise) {
228         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
229         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
230 
231         try {
232             socket.leaveGroup(multicastAddress, networkInterface, source);
233             promise.setSuccess();
234         } catch (IOException e) {
235             promise.setFailure(e);
236         }
237         return promise;
238     }
239 
240     @Override
241     public ChannelFuture block(
242             InetAddress multicastAddress, NetworkInterface networkInterface,
243             InetAddress sourceToBlock) {
244         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
245     }
246 
247     @Override
248     public ChannelFuture block(
249             final InetAddress multicastAddress, final NetworkInterface networkInterface,
250             final InetAddress sourceToBlock, final ChannelPromise promise) {
251         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
252         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
253         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
254 
255         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
256         return promise;
257     }
258 
259     @Override
260     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
261         return block(multicastAddress, sourceToBlock, newPromise());
262     }
263 
264     @Override
265     public ChannelFuture block(
266             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
267         try {
268             return block(
269                     multicastAddress,
270                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
271                     sourceToBlock, promise);
272         } catch (Throwable e) {
273             promise.setFailure(e);
274         }
275         return promise;
276     }
277 
278     @Override
279     protected AbstractEpollUnsafe newUnsafe() {
280         return new EpollDatagramChannelUnsafe();
281     }
282 
283     @Override
284     protected void doBind(SocketAddress localAddress) throws Exception {
285         if (localAddress instanceof InetSocketAddress) {
286             InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
287             if (socketAddress.getAddress().isAnyLocalAddress() &&
288                     socketAddress.getAddress() instanceof Inet4Address) {
289                 if (socket.family() == InternetProtocolFamily.IPv6) {
290                     localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
291                 }
292             }
293         }
294         super.doBind(localAddress);
295         active = true;
296     }
297 
298     @Override
299     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
300         int maxMessagesPerWrite = maxMessagesPerWrite();
301         while (maxMessagesPerWrite > 0) {
302             Object msg = in.current();
303             if (msg == null) {
304                 // Wrote all messages.
305                 break;
306             }
307 
308             try {
309                 // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
310                 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
311                         // We only handle UDP_SEGMENT in sendmmsg.
312                         in.current() instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
313                     NativeDatagramPacketArray array = cleanDatagramPacketArray();
314                     array.add(in, isConnected(), maxMessagesPerWrite);
315                     int cnt = array.count();
316 
317                     if (cnt >= 1) {
318                         // Try to use gathering writes via sendmmsg(...) syscall.
319                         int offset = 0;
320                         NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
321 
322                         int send = socket.sendmmsg(packets, offset, cnt);
323                         if (send == 0) {
324                             // Did not write all messages.
325                             break;
326                         }
327                         for (int i = 0; i < send; i++) {
328                             in.remove();
329                         }
330                         maxMessagesPerWrite -= send;
331                         continue;
332                     }
333                 }
334                 boolean done = false;
335                 for (int i = config().getWriteSpinCount(); i > 0; --i) {
336                     if (doWriteMessage(msg)) {
337                         done = true;
338                         break;
339                     }
340                 }
341 
342                 if (done) {
343                     in.remove();
344                     maxMessagesPerWrite --;
345                 } else {
346                     break;
347                 }
348             } catch (IOException e) {
349                 maxMessagesPerWrite --;
350                 // Continue on write error as a DatagramChannel can write to multiple remote peers
351                 //
352                 // See https://github.com/netty/netty/issues/2665
353                 in.remove(e);
354             }
355         }
356 
357         if (in.isEmpty()) {
358             // Did write all messages.
359             clearFlag(Native.EPOLLOUT);
360         } else {
361             // Did not write all messages.
362             setFlag(Native.EPOLLOUT);
363         }
364     }
365 
366     private boolean doWriteMessage(Object msg) throws Exception {
367         final ByteBuf data;
368         final InetSocketAddress remoteAddress;
369         if (msg instanceof AddressedEnvelope) {
370             @SuppressWarnings("unchecked")
371             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
372                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
373             data = envelope.content();
374             remoteAddress = envelope.recipient();
375         } else {
376             data = (ByteBuf) msg;
377             remoteAddress = null;
378         }
379 
380         final int dataLen = data.readableBytes();
381         if (dataLen == 0) {
382             return true;
383         }
384 
385         return doWriteOrSendBytes(data, remoteAddress, false) > 0;
386     }
387 
388     @Override
389     protected Object filterOutboundMessage(Object msg) {
390         if (msg instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
391             if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
392                 throw new UnsupportedOperationException(
393                         "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
394             }
395             io.netty.channel.unix.SegmentedDatagramPacket packet = (io.netty.channel.unix.SegmentedDatagramPacket) msg;
396             ByteBuf content = packet.content();
397             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
398                     packet.replace(newDirectBuffer(packet, content)) : msg;
399         }
400         if (msg instanceof DatagramPacket) {
401             DatagramPacket packet = (DatagramPacket) msg;
402             ByteBuf content = packet.content();
403             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
404                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
405         }
406 
407         if (msg instanceof ByteBuf) {
408             ByteBuf buf = (ByteBuf) msg;
409             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
410         }
411 
412         if (msg instanceof AddressedEnvelope) {
413             @SuppressWarnings("unchecked")
414             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
415             if (e.content() instanceof ByteBuf &&
416                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
417 
418                 ByteBuf content = (ByteBuf) e.content();
419                 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
420                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
421                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
422             }
423         }
424 
425         throw new UnsupportedOperationException(
426                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
427     }
428 
429     @Override
430     public EpollDatagramChannelConfig config() {
431         return config;
432     }
433 
434     @Override
435     protected void doDisconnect() throws Exception {
436         socket.disconnect();
437         connected = active = false;
438         resetCachedAddresses();
439     }
440 
441     @Override
442     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
443         if (super.doConnect(remoteAddress, localAddress)) {
444             connected = true;
445             return true;
446         }
447         return false;
448     }
449 
450     @Override
451     protected void doClose() throws Exception {
452         super.doClose();
453         connected = false;
454     }
455 
456     final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
457 
458         @Override
459         void epollInReady() {
460             assert eventLoop().inEventLoop();
461             EpollDatagramChannelConfig config = config();
462             if (shouldBreakEpollInReady(config)) {
463                 clearEpollIn0();
464                 return;
465             }
466             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
467             allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
468 
469             final ChannelPipeline pipeline = pipeline();
470             final ByteBufAllocator allocator = config.getAllocator();
471             allocHandle.reset(config);
472             epollInBefore();
473 
474             Throwable exception = null;
475             try {
476                 try {
477                     boolean connected = isConnected();
478                     do {
479                         final boolean read;
480                         int datagramSize = config().getMaxDatagramPayloadSize();
481 
482                         ByteBuf byteBuf = allocHandle.allocate(allocator);
483                         // Only try to use recvmmsg if its really supported by the running system.
484                         int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
485                                 datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize :
486                                 0;
487                         try {
488                             if (numDatagram <= 1) {
489                                 if (!connected || config.isUdpGro()) {
490                                     read = recvmsg(allocHandle, cleanDatagramPacketArray(), byteBuf);
491                                 } else {
492                                     read = connectedRead(allocHandle, byteBuf, datagramSize);
493                                 }
494                             } else {
495                                 // Try to use scattering reads via recvmmsg(...) syscall.
496                                 read = scatteringRead(allocHandle, cleanDatagramPacketArray(),
497                                         byteBuf, datagramSize, numDatagram);
498                             }
499                         } catch (NativeIoException e) {
500                             if (connected) {
501                                 throw translateForConnected(e);
502                             }
503                             throw e;
504                         }
505 
506                         if (read) {
507                             readPending = false;
508                         } else {
509                             break;
510                         }
511                     // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
512                     // as we read anything).
513                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
514                 } catch (Throwable t) {
515                     exception = t;
516                 }
517 
518                 allocHandle.readComplete();
519                 pipeline.fireChannelReadComplete();
520 
521                 if (exception != null) {
522                     pipeline.fireExceptionCaught(exception);
523                 }
524             } finally {
525                 epollInFinally(config);
526             }
527         }
528     }
529 
530     private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize)
531             throws Exception {
532         try {
533             int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize)
534                     : byteBuf.writableBytes();
535             allocHandle.attemptedBytesRead(writable);
536 
537             int writerIndex = byteBuf.writerIndex();
538             int localReadAmount;
539             if (byteBuf.hasMemoryAddress()) {
540                 localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, writerIndex + writable);
541             } else {
542                 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, writable);
543                 localReadAmount = socket.read(buf, buf.position(), buf.limit());
544             }
545 
546             if (localReadAmount <= 0) {
547                 allocHandle.lastBytesRead(localReadAmount);
548 
549                 // nothing was read, release the buffer.
550                 return false;
551             }
552             byteBuf.writerIndex(writerIndex + localReadAmount);
553 
554             allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
555                     localReadAmount : writable);
556 
557             DatagramPacket packet = new DatagramPacket(byteBuf, localAddress(), remoteAddress());
558             allocHandle.incMessagesRead(1);
559 
560             pipeline().fireChannelRead(packet);
561             byteBuf = null;
562             return true;
563         } finally {
564             if (byteBuf != null) {
565                 byteBuf.release();
566             }
567         }
568     }
569 
570     private IOException translateForConnected(NativeIoException e) {
571         // We need to correctly translate connect errors to match NIO behaviour.
572         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
573             PortUnreachableException error = new PortUnreachableException(e.getMessage());
574             error.initCause(e);
575             return error;
576         }
577         return e;
578     }
579 
580     private static void addDatagramPacketToOut(DatagramPacket packet,
581                                               RecyclableArrayList out) {
582         if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
583             io.netty.channel.unix.SegmentedDatagramPacket segmentedDatagramPacket =
584                     (io.netty.channel.unix.SegmentedDatagramPacket) packet;
585             ByteBuf content = segmentedDatagramPacket.content();
586             InetSocketAddress recipient = segmentedDatagramPacket.recipient();
587             InetSocketAddress sender = segmentedDatagramPacket.sender();
588             int segmentSize = segmentedDatagramPacket.segmentSize();
589             do {
590                 out.add(new DatagramPacket(content.readRetainedSlice(Math.min(content.readableBytes(),
591                         segmentSize)), recipient, sender));
592             } while (content.isReadable());
593 
594             segmentedDatagramPacket.release();
595         } else {
596             out.add(packet);
597         }
598     }
599 
600     private static void releaseAndRecycle(ByteBuf byteBuf, RecyclableArrayList packetList) {
601         if (byteBuf != null) {
602             byteBuf.release();
603         }
604         if (packetList != null) {
605             for (int i = 0; i < packetList.size(); i++) {
606                 ReferenceCountUtil.release(packetList.get(i));
607             }
608             packetList.recycle();
609         }
610     }
611 
612     private static void processPacket(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
613                                       int bytesRead, DatagramPacket packet) {
614         handle.lastBytesRead(bytesRead);
615         handle.incMessagesRead(1);
616         pipeline.fireChannelRead(packet);
617     }
618 
619     private static void processPacketList(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
620                                           int bytesRead, RecyclableArrayList packetList) {
621         int messagesRead = packetList.size();
622         handle.lastBytesRead(bytesRead);
623         handle.incMessagesRead(messagesRead);
624         for (int i = 0; i < messagesRead; i++) {
625             pipeline.fireChannelRead(packetList.set(i, Unpooled.EMPTY_BUFFER));
626         }
627     }
628 
629     private boolean recvmsg(EpollRecvByteAllocatorHandle allocHandle,
630                             NativeDatagramPacketArray array, ByteBuf byteBuf) throws IOException {
631         RecyclableArrayList datagramPackets = null;
632         try {
633             int writable = byteBuf.writableBytes();
634 
635             boolean added = array.addWritable(byteBuf, byteBuf.writerIndex(), writable);
636             assert added;
637 
638             allocHandle.attemptedBytesRead(writable);
639 
640             NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
641 
642             int bytesReceived = socket.recvmsg(msg);
643             if (bytesReceived == 0) {
644                 allocHandle.lastBytesRead(-1);
645                 return false;
646             }
647             byteBuf.writerIndex(bytesReceived);
648             InetSocketAddress local = localAddress();
649             DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
650             if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
651                 processPacket(pipeline(), allocHandle, bytesReceived, packet);
652                 byteBuf = null;
653             } else {
654                 // Its important that we process all received data out of the NativeDatagramPacketArray
655                 // before we call fireChannelRead(...). This is because the user may call flush()
656                 // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
657                 datagramPackets = RecyclableArrayList.newInstance();
658                 addDatagramPacketToOut(packet, datagramPackets);
659                 // null out byteBuf as addDatagramPacketToOut did take ownership of the ByteBuf / packet and transfered
660                 // it into the RecyclableArrayList.
661                 byteBuf = null;
662 
663                 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
664                 datagramPackets.recycle();
665                 datagramPackets = null;
666             }
667 
668             return true;
669         } finally {
670             releaseAndRecycle(byteBuf, datagramPackets);
671         }
672     }
673 
674     private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeDatagramPacketArray array,
675             ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException {
676         RecyclableArrayList datagramPackets = null;
677         try {
678             int offset = byteBuf.writerIndex();
679             for (int i = 0; i < numDatagram;  i++, offset += datagramSize) {
680                 if (!array.addWritable(byteBuf, offset, datagramSize)) {
681                     break;
682                 }
683             }
684 
685             allocHandle.attemptedBytesRead(offset - byteBuf.writerIndex());
686 
687             NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
688 
689             int received = socket.recvmmsg(packets, 0, array.count());
690             if (received == 0) {
691                 allocHandle.lastBytesRead(-1);
692                 return false;
693             }
694             int bytesReceived = received * datagramSize;
695             byteBuf.writerIndex(bytesReceived);
696             InetSocketAddress local = localAddress();
697             if (received == 1) {
698                 // Single packet fast-path
699                 DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
700                 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
701                     processPacket(pipeline(), allocHandle, datagramSize, packet);
702                     byteBuf = null;
703                     return true;
704                 }
705             }
706             // Its important that we process all received data out of the NativeDatagramPacketArray
707             // before we call fireChannelRead(...). This is because the user may call flush()
708             // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
709             datagramPackets = RecyclableArrayList.newInstance();
710             for (int i = 0; i < received; i++) {
711                 DatagramPacket packet = packets[i].newDatagramPacket(byteBuf.readRetainedSlice(datagramSize), local);
712                 addDatagramPacketToOut(packet, datagramPackets);
713             }
714             // Ass we did use readRetainedSlice(...) before we should now release the byteBuf and null it out.
715             byteBuf.release();
716             byteBuf = null;
717 
718             processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
719             datagramPackets.recycle();
720             datagramPackets = null;
721             return true;
722         } finally {
723             releaseAndRecycle(byteBuf, datagramPackets);
724         }
725     }
726 
727     private NativeDatagramPacketArray cleanDatagramPacketArray() {
728         return ((EpollEventLoop) eventLoop()).cleanDatagramPacketArray();
729     }
730 }