View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.AddressedEnvelope;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOutboundBuffer;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.channel.DefaultAddressedEnvelope;
28  import io.netty.channel.socket.DatagramChannel;
29  import io.netty.channel.socket.DatagramPacket;
30  import io.netty.channel.socket.InternetProtocolFamily;
31  import io.netty.channel.unix.Errors;
32  import io.netty.channel.unix.Errors.NativeIoException;
33  import io.netty.channel.unix.UnixChannelUtil;
34  import io.netty.util.ReferenceCountUtil;
35  import io.netty.util.UncheckedBooleanSupplier;
36  import io.netty.util.internal.ObjectUtil;
37  import io.netty.util.internal.RecyclableArrayList;
38  import io.netty.util.internal.StringUtil;
39  
40  import java.io.IOException;
41  import java.net.Inet4Address;
42  import java.net.InetAddress;
43  import java.net.InetSocketAddress;
44  import java.net.NetworkInterface;
45  import java.net.PortUnreachableException;
46  import java.net.SocketAddress;
47  import java.nio.ByteBuffer;
48  import java.nio.channels.UnresolvedAddressException;
49  
50  import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
51  
52  /**
53   * {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
54   * maximal performance.
55   */
56  public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
57      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
58      private static final String EXPECTED_TYPES =
59              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
60              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
61              StringUtil.simpleClassName(ByteBuf.class) + ", " +
62              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
63              StringUtil.simpleClassName(ByteBuf.class) + ')';
64  
65      private final EpollDatagramChannelConfig config;
66      private volatile boolean connected;
67  
68      /**
69       * Returns {@code true} if {@link io.netty.channel.unix.SegmentedDatagramPacket} is supported natively.
70       *
71       * @return {@code true} if supported, {@code false} otherwise.
72       */
73      public static boolean isSegmentedDatagramPacketSupported() {
74          return Epoll.isAvailable() &&
75                  // We only support it together with sendmmsg(...)
76                  Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
77      }
78  
79      /**
80       * Create a new instance which selects the {@link InternetProtocolFamily} to use depending
81       * on the Operation Systems default which will be chosen.
82       */
83      public EpollDatagramChannel() {
84          this(null);
85      }
86  
87      /**
88       * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
89       * on the Operation Systems default which will be chosen.
90       */
91      public EpollDatagramChannel(InternetProtocolFamily family) {
92          this(newSocketDgram(family), false);
93      }
94  
95      /**
96       * Create a new instance which selects the {@link InternetProtocolFamily} to use depending
97       * on the Operation Systems default which will be chosen.
98       */
99      public EpollDatagramChannel(int fd) {
100         this(new LinuxSocket(fd), true);
101     }
102 
103     private EpollDatagramChannel(LinuxSocket fd, boolean active) {
104         super(null, fd, active);
105         config = new EpollDatagramChannelConfig(this);
106     }
107 
108     @Override
109     public InetSocketAddress remoteAddress() {
110         return (InetSocketAddress) super.remoteAddress();
111     }
112 
113     @Override
114     public InetSocketAddress localAddress() {
115         return (InetSocketAddress) super.localAddress();
116     }
117 
118     @Override
119     public ChannelMetadata metadata() {
120         return METADATA;
121     }
122 
123     @Override
124     public boolean isActive() {
125         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
126     }
127 
128     @Override
129     public boolean isConnected() {
130         return connected;
131     }
132 
133     @Override
134     public ChannelFuture joinGroup(InetAddress multicastAddress) {
135         return joinGroup(multicastAddress, newPromise());
136     }
137 
138     @Override
139     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
140         try {
141             NetworkInterface iface = config().getNetworkInterface();
142             if (iface == null) {
143                 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
144             }
145             return joinGroup(multicastAddress, iface, null, promise);
146         } catch (IOException e) {
147             promise.setFailure(e);
148         }
149         return promise;
150     }
151 
152     @Override
153     public ChannelFuture joinGroup(
154             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
155         return joinGroup(multicastAddress, networkInterface, newPromise());
156     }
157 
158     @Override
159     public ChannelFuture joinGroup(
160             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
161             ChannelPromise promise) {
162         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
163     }
164 
165     @Override
166     public ChannelFuture joinGroup(
167             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
168         return joinGroup(multicastAddress, networkInterface, source, newPromise());
169     }
170 
171     @Override
172     public ChannelFuture joinGroup(
173             final InetAddress multicastAddress, final NetworkInterface networkInterface,
174             final InetAddress source, final ChannelPromise promise) {
175 
176         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
177         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
178 
179         if (eventLoop().inEventLoop()) {
180             joinGroup0(multicastAddress, networkInterface, source, promise);
181         } else {
182             eventLoop().execute(new Runnable() {
183                 @Override
184                 public void run() {
185                     joinGroup0(multicastAddress, networkInterface, source, promise);
186                 }
187             });
188         }
189         return promise;
190     }
191 
192     private void joinGroup0(
193             final InetAddress multicastAddress, final NetworkInterface networkInterface,
194             final InetAddress source, final ChannelPromise promise) {
195         assert eventLoop().inEventLoop();
196 
197         try {
198             socket.joinGroup(multicastAddress, networkInterface, source);
199             promise.setSuccess();
200         } catch (IOException e) {
201             promise.setFailure(e);
202         }
203     }
204 
205     @Override
206     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
207         return leaveGroup(multicastAddress, newPromise());
208     }
209 
210     @Override
211     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
212         try {
213             return leaveGroup(
214                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
215         } catch (IOException e) {
216             promise.setFailure(e);
217         }
218         return promise;
219     }
220 
221     @Override
222     public ChannelFuture leaveGroup(
223             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
224         return leaveGroup(multicastAddress, networkInterface, newPromise());
225     }
226 
227     @Override
228     public ChannelFuture leaveGroup(
229             InetSocketAddress multicastAddress,
230             NetworkInterface networkInterface, ChannelPromise promise) {
231         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
232     }
233 
234     @Override
235     public ChannelFuture leaveGroup(
236             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
237         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
238     }
239 
240     @Override
241     public ChannelFuture leaveGroup(
242             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
243             final ChannelPromise promise) {
244         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
245         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
246 
247         if (eventLoop().inEventLoop()) {
248             leaveGroup0(multicastAddress, networkInterface, source, promise);
249         } else {
250             eventLoop().execute(new Runnable() {
251                 @Override
252                 public void run() {
253                     leaveGroup0(multicastAddress, networkInterface, source, promise);
254                 }
255             });
256         }
257         return promise;
258     }
259 
260     private void leaveGroup0(
261             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
262             final ChannelPromise promise) {
263         assert eventLoop().inEventLoop();
264 
265         try {
266             socket.leaveGroup(multicastAddress, networkInterface, source);
267             promise.setSuccess();
268         } catch (IOException e) {
269             promise.setFailure(e);
270         }
271     }
272 
273     @Override
274     public ChannelFuture block(
275             InetAddress multicastAddress, NetworkInterface networkInterface,
276             InetAddress sourceToBlock) {
277         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
278     }
279 
280     @Override
281     public ChannelFuture block(
282             final InetAddress multicastAddress, final NetworkInterface networkInterface,
283             final InetAddress sourceToBlock, final ChannelPromise promise) {
284         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
285         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
286         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
287 
288         promise.setFailure(new UnsupportedOperationException("Multicast block not supported"));
289         return promise;
290     }
291 
292     @Override
293     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
294         return block(multicastAddress, sourceToBlock, newPromise());
295     }
296 
297     @Override
298     public ChannelFuture block(
299             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
300         try {
301             return block(
302                     multicastAddress,
303                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
304                     sourceToBlock, promise);
305         } catch (Throwable e) {
306             promise.setFailure(e);
307         }
308         return promise;
309     }
310 
311     @Override
312     protected AbstractEpollUnsafe newUnsafe() {
313         return new EpollDatagramChannelUnsafe();
314     }
315 
316     @Override
317     protected void doBind(SocketAddress localAddress) throws Exception {
318         if (localAddress instanceof InetSocketAddress) {
319             InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
320             if (socketAddress.getAddress().isAnyLocalAddress() &&
321                     socketAddress.getAddress() instanceof Inet4Address) {
322                 if (socket.family() == InternetProtocolFamily.IPv6) {
323                     localAddress = new InetSocketAddress(Native.INET6_ANY, socketAddress.getPort());
324                 }
325             }
326         }
327         super.doBind(localAddress);
328         active = true;
329     }
330 
331     @Override
332     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
333         int maxMessagesPerWrite = maxMessagesPerWrite();
334         while (maxMessagesPerWrite > 0) {
335             Object msg = in.current();
336             if (msg == null) {
337                 // Wrote all messages.
338                 break;
339             }
340 
341             try {
342                 // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
343                 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
344                         // We only handle UDP_SEGMENT in sendmmsg.
345                         in.current() instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
346                     NativeDatagramPacketArray array = cleanDatagramPacketArray();
347                     array.add(in, isConnected(), maxMessagesPerWrite);
348                     int cnt = array.count();
349 
350                     if (cnt >= 1) {
351                         // Try to use gathering writes via sendmmsg(...) syscall.
352                         int offset = 0;
353                         NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
354 
355                         int send = socket.sendmmsg(packets, offset, cnt);
356                         if (send == 0) {
357                             // Did not write all messages.
358                             break;
359                         }
360                         for (int i = 0; i < send; i++) {
361                             in.remove();
362                         }
363                         maxMessagesPerWrite -= send;
364                         continue;
365                     }
366                 }
367                 boolean done = false;
368                 for (int i = config().getWriteSpinCount(); i > 0; --i) {
369                     if (doWriteMessage(msg)) {
370                         done = true;
371                         break;
372                     }
373                 }
374 
375                 if (done) {
376                     in.remove();
377                     maxMessagesPerWrite --;
378                 } else {
379                     break;
380                 }
381             } catch (IOException e) {
382                 maxMessagesPerWrite --;
383                 // Continue on write error as a DatagramChannel can write to multiple remote peers
384                 //
385                 // See https://github.com/netty/netty/issues/2665
386                 in.remove(e);
387             }
388         }
389 
390         if (in.isEmpty()) {
391             // Did write all messages.
392             clearFlag(Native.EPOLLOUT);
393         } else {
394             // Did not write all messages.
395             setFlag(Native.EPOLLOUT);
396         }
397     }
398 
399     private boolean doWriteMessage(Object msg) throws Exception {
400         final ByteBuf data;
401         final InetSocketAddress remoteAddress;
402         if (msg instanceof AddressedEnvelope) {
403             @SuppressWarnings("unchecked")
404             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
405                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
406             data = envelope.content();
407             remoteAddress = envelope.recipient();
408         } else {
409             data = (ByteBuf) msg;
410             remoteAddress = null;
411         }
412 
413         final int dataLen = data.readableBytes();
414         if (dataLen == 0) {
415             return true;
416         }
417 
418         return doWriteOrSendBytes(data, remoteAddress, false) > 0;
419     }
420 
421     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
422         if (envelope.recipient() instanceof InetSocketAddress
423                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
424             throw new UnresolvedAddressException();
425         }
426     }
427 
428     @Override
429     protected Object filterOutboundMessage(Object msg) {
430         if (msg instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
431             if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
432                 throw new UnsupportedOperationException(
433                         "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
434             }
435             io.netty.channel.unix.SegmentedDatagramPacket packet = (io.netty.channel.unix.SegmentedDatagramPacket) msg;
436             checkUnresolved(packet);
437 
438             ByteBuf content = packet.content();
439             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
440                     packet.replace(newDirectBuffer(packet, content)) : msg;
441         }
442         if (msg instanceof DatagramPacket) {
443             DatagramPacket packet = (DatagramPacket) msg;
444             checkUnresolved(packet);
445 
446             ByteBuf content = packet.content();
447             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
448                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
449         }
450 
451         if (msg instanceof ByteBuf) {
452             ByteBuf buf = (ByteBuf) msg;
453             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
454         }
455 
456         if (msg instanceof AddressedEnvelope) {
457             @SuppressWarnings("unchecked")
458             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
459             checkUnresolved(e);
460 
461             if (e.content() instanceof ByteBuf &&
462                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
463 
464                 ByteBuf content = (ByteBuf) e.content();
465                 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
466                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
467                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
468             }
469         }
470 
471         throw new UnsupportedOperationException(
472                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
473     }
474 
475     @Override
476     public EpollDatagramChannelConfig config() {
477         return config;
478     }
479 
480     @Override
481     protected void doDisconnect() throws Exception {
482         socket.disconnect();
483         connected = active = false;
484         resetCachedAddresses();
485     }
486 
487     @Override
488     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
489         if (super.doConnect(remoteAddress, localAddress)) {
490             connected = true;
491             return true;
492         }
493         return false;
494     }
495 
496     @Override
497     protected void doClose() throws Exception {
498         super.doClose();
499         connected = false;
500     }
501 
502     final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
503 
504         @Override
505         void epollInReady() {
506             assert eventLoop().inEventLoop();
507             EpollDatagramChannelConfig config = config();
508             if (shouldBreakEpollInReady(config)) {
509                 clearEpollIn0();
510                 return;
511             }
512             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
513             allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
514 
515             final ChannelPipeline pipeline = pipeline();
516             final ByteBufAllocator allocator = config.getAllocator();
517             allocHandle.reset(config);
518             epollInBefore();
519 
520             Throwable exception = null;
521             try {
522                 try {
523                     boolean connected = isConnected();
524                     do {
525                         final boolean read;
526                         int datagramSize = config().getMaxDatagramPayloadSize();
527 
528                         ByteBuf byteBuf = allocHandle.allocate(allocator);
529                         // Only try to use recvmmsg if its really supported by the running system.
530                         int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
531                                 datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize :
532                                 0;
533                         try {
534                             if (numDatagram <= 1) {
535                                 if (!connected || config.isUdpGro()) {
536                                     read = recvmsg(allocHandle, cleanDatagramPacketArray(), byteBuf);
537                                 } else {
538                                     read = connectedRead(allocHandle, byteBuf, datagramSize);
539                                 }
540                             } else {
541                                 // Try to use scattering reads via recvmmsg(...) syscall.
542                                 read = scatteringRead(allocHandle, cleanDatagramPacketArray(),
543                                         byteBuf, datagramSize, numDatagram);
544                             }
545                         } catch (NativeIoException e) {
546                             if (connected) {
547                                 throw translateForConnected(e);
548                             }
549                             throw e;
550                         }
551 
552                         if (read) {
553                             readPending = false;
554                         } else {
555                             break;
556                         }
557                     // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
558                     // as we read anything).
559                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
560                 } catch (Throwable t) {
561                     exception = t;
562                 }
563 
564                 allocHandle.readComplete();
565                 pipeline.fireChannelReadComplete();
566 
567                 if (exception != null) {
568                     pipeline.fireExceptionCaught(exception);
569                 }
570             } finally {
571                 epollInFinally(config);
572             }
573         }
574     }
575 
576     private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize)
577             throws Exception {
578         try {
579             int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize)
580                     : byteBuf.writableBytes();
581             allocHandle.attemptedBytesRead(writable);
582 
583             int writerIndex = byteBuf.writerIndex();
584             int localReadAmount;
585             if (byteBuf.hasMemoryAddress()) {
586                 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, writerIndex + writable);
587             } else {
588                 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, writable);
589                 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
590             }
591 
592             if (localReadAmount <= 0) {
593                 allocHandle.lastBytesRead(localReadAmount);
594 
595                 // nothing was read, release the buffer.
596                 return false;
597             }
598             byteBuf.writerIndex(writerIndex + localReadAmount);
599 
600             allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
601                     localReadAmount : writable);
602 
603             DatagramPacket packet = new DatagramPacket(byteBuf, localAddress(), remoteAddress());
604             allocHandle.incMessagesRead(1);
605 
606             pipeline().fireChannelRead(packet);
607             byteBuf = null;
608             return true;
609         } finally {
610             if (byteBuf != null) {
611                 byteBuf.release();
612             }
613         }
614     }
615 
616     private IOException translateForConnected(NativeIoException e) {
617         // We need to correctly translate connect errors to match NIO behaviour.
618         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
619             PortUnreachableException error = new PortUnreachableException(e.getMessage());
620             error.initCause(e);
621             return error;
622         }
623         return e;
624     }
625 
626     private static void addDatagramPacketToOut(DatagramPacket packet,
627                                               RecyclableArrayList out) {
628         if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
629             io.netty.channel.unix.SegmentedDatagramPacket segmentedDatagramPacket =
630                     (io.netty.channel.unix.SegmentedDatagramPacket) packet;
631             ByteBuf content = segmentedDatagramPacket.content();
632             InetSocketAddress recipient = segmentedDatagramPacket.recipient();
633             InetSocketAddress sender = segmentedDatagramPacket.sender();
634             int segmentSize = segmentedDatagramPacket.segmentSize();
635             do {
636                 out.add(new DatagramPacket(content.readRetainedSlice(Math.min(content.readableBytes(),
637                         segmentSize)), recipient, sender));
638             } while (content.isReadable());
639 
640             segmentedDatagramPacket.release();
641         } else {
642             out.add(packet);
643         }
644     }
645 
646     private static void releaseAndRecycle(ByteBuf byteBuf, RecyclableArrayList packetList) {
647         if (byteBuf != null) {
648             byteBuf.release();
649         }
650         if (packetList != null) {
651             for (int i = 0; i < packetList.size(); i++) {
652                 ReferenceCountUtil.release(packetList.get(i));
653             }
654             packetList.recycle();
655         }
656     }
657 
658     private static void processPacket(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
659                                       int bytesRead, DatagramPacket packet) {
660         handle.lastBytesRead(Math.max(1, bytesRead)); // Avoid signalling end-of-data for zero-sized datagrams.
661         handle.incMessagesRead(1);
662         pipeline.fireChannelRead(packet);
663     }
664 
665     private static void processPacketList(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
666                                           int bytesRead, RecyclableArrayList packetList) {
667         int messagesRead = packetList.size();
668         handle.lastBytesRead(Math.max(1, bytesRead)); // Avoid signalling end-of-data for zero-sized datagrams.
669         handle.incMessagesRead(messagesRead);
670         for (int i = 0; i < messagesRead; i++) {
671             pipeline.fireChannelRead(packetList.set(i, Unpooled.EMPTY_BUFFER));
672         }
673     }
674 
675     private boolean recvmsg(EpollRecvByteAllocatorHandle allocHandle,
676                             NativeDatagramPacketArray array, ByteBuf byteBuf) throws IOException {
677         RecyclableArrayList datagramPackets = null;
678         try {
679             int writable = byteBuf.writableBytes();
680 
681             boolean added = array.addWritable(byteBuf, byteBuf.writerIndex(), writable);
682             assert added;
683 
684             allocHandle.attemptedBytesRead(writable);
685 
686             NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
687 
688             int bytesReceived = socket.recvmsg(msg);
689             if (!msg.hasSender()) {
690                 allocHandle.lastBytesRead(-1);
691                 return false;
692             }
693             byteBuf.writerIndex(bytesReceived);
694             InetSocketAddress local = localAddress();
695             DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
696             if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
697                 processPacket(pipeline(), allocHandle, bytesReceived, packet);
698             } else {
699                 // Its important that we process all received data out of the NativeDatagramPacketArray
700                 // before we call fireChannelRead(...). This is because the user may call flush()
701                 // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
702                 datagramPackets = RecyclableArrayList.newInstance();
703                 addDatagramPacketToOut(packet, datagramPackets);
704 
705                 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
706                 datagramPackets.recycle();
707                 datagramPackets = null;
708             }
709 
710             return true;
711         } finally {
712             releaseAndRecycle(byteBuf, datagramPackets);
713         }
714     }
715 
716     private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeDatagramPacketArray array,
717             ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException {
718         RecyclableArrayList datagramPackets = null;
719         try {
720             int offset = byteBuf.writerIndex();
721             for (int i = 0; i < numDatagram;  i++, offset += datagramSize) {
722                 if (!array.addWritable(byteBuf, offset, datagramSize)) {
723                     break;
724                 }
725             }
726 
727             allocHandle.attemptedBytesRead(offset - byteBuf.writerIndex());
728 
729             NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
730 
731             int received = socket.recvmmsg(packets, 0, array.count());
732             if (received == 0) {
733                 allocHandle.lastBytesRead(-1);
734                 return false;
735             }
736 
737             InetSocketAddress local = localAddress();
738 
739             // Set the writerIndex too the maximum number of bytes we might have read.
740             int bytesReceived = received * datagramSize;
741             byteBuf.writerIndex(byteBuf.writerIndex() + bytesReceived);
742 
743             if (received == 1) {
744                 // Single packet fast-path
745                 DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
746                 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
747                     processPacket(pipeline(), allocHandle, datagramSize, packet);
748                     return true;
749                 }
750             }
751             // Its important that we process all received data out of the NativeDatagramPacketArray
752             // before we call fireChannelRead(...). This is because the user may call flush()
753             // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
754             datagramPackets = RecyclableArrayList.newInstance();
755             for (int i = 0; i < received; i++) {
756                 DatagramPacket packet = packets[i].newDatagramPacket(byteBuf, local);
757 
758                 // We need to skip the maximum datagram size to ensure we have the readerIndex in the right position
759                 // for the next one.
760                 byteBuf.skipBytes(datagramSize);
761                 addDatagramPacketToOut(packet, datagramPackets);
762             }
763             // Ass we did use readRetainedSlice(...) before we should now release the byteBuf and null it out.
764             byteBuf.release();
765             byteBuf = null;
766 
767             processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
768             datagramPackets.recycle();
769             datagramPackets = null;
770             return true;
771         } finally {
772             releaseAndRecycle(byteBuf, datagramPackets);
773         }
774     }
775 
776     private NativeDatagramPacketArray cleanDatagramPacketArray() {
777         return ((EpollEventLoop) eventLoop()).cleanDatagramPacketArray();
778     }
779 }