View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.epoll;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  import io.netty5.channel.ChannelException;
21  import io.netty5.channel.ChannelOption;
22  import io.netty5.channel.ChannelShutdownDirection;
23  import io.netty5.channel.FixedRecvBufferAllocator;
24  import io.netty5.channel.RecvBufferAllocator;
25  import io.netty5.channel.socket.DomainSocketAddress;
26  import io.netty5.channel.socket.SocketProtocolFamily;
27  import io.netty5.channel.unix.DomainDatagramSocketAddress;
28  import io.netty5.channel.unix.IntegerUnixChannelOption;
29  import io.netty5.channel.unix.RawUnixChannelOption;
30  import io.netty5.channel.unix.RecvFromAddressDomainSocket;
31  import io.netty5.channel.unix.UnixChannel;
32  import io.netty5.channel.unix.UnixChannelOption;
33  import io.netty5.util.Resource;
34  import io.netty5.channel.AddressedEnvelope;
35  import io.netty5.channel.ChannelMetadata;
36  import io.netty5.channel.ChannelOutboundBuffer;
37  import io.netty5.channel.ChannelPipeline;
38  import io.netty5.channel.DefaultBufferAddressedEnvelope;
39  import io.netty5.channel.EventLoop;
40  import io.netty5.channel.socket.DatagramChannel;
41  import io.netty5.channel.socket.DatagramPacket;
42  import io.netty5.channel.unix.Errors;
43  import io.netty5.channel.unix.Errors.NativeIoException;
44  import io.netty5.channel.unix.SegmentedDatagramPacket;
45  import io.netty5.channel.unix.UnixChannelUtil;
46  import io.netty5.util.concurrent.Future;
47  import io.netty5.util.concurrent.Promise;
48  import io.netty5.util.internal.ObjectUtil;
49  import io.netty5.util.internal.RecyclableArrayList;
50  import io.netty5.util.internal.SilentDispose;
51  import io.netty5.util.internal.StringUtil;
52  import io.netty5.util.internal.logging.InternalLogger;
53  import io.netty5.util.internal.logging.InternalLoggerFactory;
54  
55  import java.io.IOException;
56  import java.net.Inet4Address;
57  import java.net.InetAddress;
58  import java.net.InetSocketAddress;
59  import java.net.NetworkInterface;
60  import java.net.PortUnreachableException;
61  import java.net.ProtocolFamily;
62  import java.net.SocketAddress;
63  import java.net.SocketException;
64  import java.util.Set;
65  import java.util.function.Predicate;
66  
67  import static java.util.Objects.requireNonNull;
68  
69  /**
70   * {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
71   * maximal performance.
72   *
73   * <h3>Available options</h3>
74   *
75   * In addition to the options provided by {@link DatagramChannel} and {@link UnixChannel},
76   * {@link EpollDatagramChannel} allows the following options in the option map:
77   *
78   * <table border="1" cellspacing="0" cellpadding="6">
79   * <tr>
80   * <th>{@link ChannelOption}</th>
81   * <th>{@code INET}</th>
82   * <th>{@code INET6}</th>
83   * <th>{@code UNIX</th>
84   * </tr><tr>
85   * <td>{@link IntegerUnixChannelOption}</td><td>X</td><td>X</td><td>X</td>
86   * </tr><tr>
87   * <td>{@link RawUnixChannelOption}</td><td>X</td><td>X</td><td>X</td>
88   * </tr><tr>
89   * <td>{@link UnixChannelOption#SO_REUSEPORT}</td><td>X</td><td>X</td><td>X</td>
90   * </tr><tr>
91   * <td>{@link EpollChannelOption#IP_FREEBIND}</td><td>X</td><td>X</td><td>-</td>
92   * </tr><tr>
93   * <td>{@link EpollChannelOption#IP_RECVORIGDSTADDR}</td><td>X</td><td>X</td><td>-</td>
94   * </tr><tr>
95   * <td>{@link EpollChannelOption#MAX_DATAGRAM_PAYLOAD_SIZE}</td><td>X</td><td>X</td><td>-</td>
96   * </tr><tr>
97   * <td>{@link EpollChannelOption#UDP_GRO}</td><td>X</td><td>X</td><td>-</td>
98   * </tr>
99   * </table>
100  */
101 public final class EpollDatagramChannel extends AbstractEpollChannel<UnixChannel> implements DatagramChannel {
102     private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollDatagramChannel.class);
103     private static final ChannelMetadata METADATA = new ChannelMetadata(true);
104     private static final String EXPECTED_TYPES =
105             " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
106             StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
107             StringUtil.simpleClassName(Buffer.class) + ", " +
108             StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
109             StringUtil.simpleClassName(Buffer.class) + ')';
110 
111     private static final String EXPECTED_TYPES_DOMAIN_SOCKET =
112             " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
113                     StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
114                     StringUtil.simpleClassName(Buffer.class) + ", " +
115                     StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
116                     StringUtil.simpleClassName(Buffer.class) + ')';
117     private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS = supportedOptions();
118 
119     private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS_DOMAIN_SOCKET = supportedOptionsDomainSocket();
120     private static final Predicate<RecvBufferAllocator.Handle> TRUE_SUPPLIER = h -> true;
121 
122     private volatile boolean activeOnOpen;
123     private volatile int maxDatagramSize;
124     private volatile boolean gro;
125 
126     private volatile boolean connected;
127     private volatile boolean inputShutdown;
128     private volatile boolean outputShutdown;
129 
130     /**
131      * Returns {@code true} if {@link SegmentedDatagramPacket} is supported natively.
132      *
133      * @return {@code true} if supported, {@code false} otherwise.
134      */
135     public static boolean isSegmentedDatagramPacketSupported() {
136         return Epoll.isAvailable() &&
137                 // We only support it together with sendmmsg(...)
138                 Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
139     }
140 
141     /**
142      * Create a new instance which selects the {@link ProtocolFamily} to use depending
143      * on the Operation Systems default which will be chosen.
144      */
145     public EpollDatagramChannel(EventLoop eventLoop) {
146         this(eventLoop, null);
147     }
148 
149     /**
150      * Create a new instance using the given {@link ProtocolFamily}. If {@code null} is used it will depend
151      * on the Operation Systems default which will be chosen.
152      */
153     public EpollDatagramChannel(EventLoop eventLoop, ProtocolFamily family) {
154         this(eventLoop, LinuxSocket.newDatagramSocket(family), false);
155     }
156 
157     /**
158      * Create a new instance which selects the {@link ProtocolFamily} to use depending
159      * on the Operation Systems default which will be chosen.
160      */
161     public EpollDatagramChannel(EventLoop eventLoop, int fd, ProtocolFamily family) {
162         this(eventLoop, new LinuxSocket(fd, SocketProtocolFamily.of(family)), true);
163     }
164 
165     private EpollDatagramChannel(EventLoop eventLoop, LinuxSocket fd, boolean active) {
166         super(null, eventLoop, METADATA, 0, new FixedRecvBufferAllocator(2048), fd, active);
167     }
168 
169     @Override
170     public boolean isActive() {
171         return socket.isOpen() && (getActiveOnOpen() && isRegistered() || active);
172     }
173 
174     @Override
175     public boolean isConnected() {
176         return connected;
177     }
178 
179     private NetworkInterface networkInterface() throws SocketException {
180         NetworkInterface iface = getNetworkInterface();
181         if (iface == null) {
182             SocketAddress localAddress = localAddress();
183             if (localAddress instanceof InetSocketAddress) {
184                 return NetworkInterface.getByInetAddress(((InetSocketAddress) localAddress()).getAddress());
185             }
186         }
187         return null;
188     }
189 
190     @Override
191     public Future<Void> joinGroup(InetAddress multicastAddress) {
192         try {
193             return joinGroup(multicastAddress, networkInterface(), null);
194         } catch (IOException | UnsupportedOperationException e) {
195             return newFailedFuture(e);
196         }
197     }
198 
199     @Override
200     public Future<Void> joinGroup(
201             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
202         requireNonNull(multicastAddress, "multicastAddress");
203         requireNonNull(networkInterface, "networkInterface");
204 
205         if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
206             return newFailedFuture(new UnsupportedOperationException("Multicast not supported"));
207         }
208 
209         Promise<Void> promise = newPromise();
210         if (executor().inEventLoop()) {
211             joinGroup0(multicastAddress, networkInterface, source, promise);
212         } else {
213             executor().execute(() -> joinGroup0(multicastAddress, networkInterface, source, promise));
214         }
215         return promise.asFuture();
216     }
217 
218     private void joinGroup0(InetAddress multicastAddress, NetworkInterface networkInterface,
219                            InetAddress source, Promise<Void> promise) {
220         assertEventLoop();
221 
222         try {
223             socket.joinGroup(multicastAddress, networkInterface, source);
224         } catch (IOException e) {
225             promise.setFailure(e);
226             return;
227         }
228         promise.setSuccess(null);
229     }
230 
231     @Override
232     public Future<Void> leaveGroup(InetAddress multicastAddress) {
233         try {
234             return leaveGroup(multicastAddress, networkInterface(), null);
235         } catch (IOException | UnsupportedOperationException e) {
236             return newFailedFuture(e);
237         }
238     }
239 
240     @Override
241     public Future<Void> leaveGroup(
242             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
243         requireNonNull(multicastAddress, "multicastAddress");
244         requireNonNull(networkInterface, "networkInterface");
245 
246         if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
247             return newFailedFuture(new UnsupportedOperationException("Multicast not supported"));
248         }
249 
250         Promise<Void> promise = newPromise();
251         if (executor().inEventLoop()) {
252             leaveGroup0(multicastAddress, networkInterface, source, promise);
253         } else {
254             executor().execute(() -> leaveGroup0(multicastAddress, networkInterface, source, promise));
255         }
256         return promise.asFuture();
257     }
258 
259     private void leaveGroup0(
260             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
261             final Promise<Void> promise) {
262         assert executor().inEventLoop();
263 
264         try {
265             socket.leaveGroup(multicastAddress, networkInterface, source);
266         } catch (IOException e) {
267             promise.setFailure(e);
268             return;
269         }
270         promise.setSuccess(null);
271     }
272 
273     @Override
274     public Future<Void> block(
275             InetAddress multicastAddress, NetworkInterface networkInterface,
276             InetAddress sourceToBlock) {
277         requireNonNull(multicastAddress, "multicastAddress");
278         requireNonNull(sourceToBlock, "sourceToBlock");
279         requireNonNull(networkInterface, "networkInterface");
280         return newFailedFuture(new UnsupportedOperationException("Multicast block not supported"));
281     }
282 
283     @Override
284     public Future<Void> block(
285             InetAddress multicastAddress, InetAddress sourceToBlock) {
286         try {
287             return block(
288                     multicastAddress,
289                     networkInterface(),
290                     sourceToBlock);
291         } catch (IOException | UnsupportedOperationException e) {
292             return newFailedFuture(e);
293         }
294     }
295 
296     @Override
297     protected void doShutdown(ChannelShutdownDirection direction) {
298         switch (direction) {
299             case Inbound:
300                 inputShutdown = true;
301                 break;
302             case Outbound:
303                 outputShutdown = true;
304                 break;
305             default:
306                 throw new IllegalStateException();
307         }
308     }
309 
310     @Override
311     public boolean isShutdown(ChannelShutdownDirection direction) {
312         if (!isActive()) {
313             return true;
314         }
315         switch (direction) {
316             case Inbound:
317                 return inputShutdown;
318             case Outbound:
319                 return outputShutdown;
320             default:
321                 throw new AssertionError();
322         }
323     }
324 
325     @Override
326     protected void doBind(SocketAddress localAddress) throws Exception {
327         if (localAddress instanceof InetSocketAddress) {
328             InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
329             if (socketAddress.getAddress().isAnyLocalAddress() &&
330                     socketAddress.getAddress() instanceof Inet4Address) {
331                 if (socket.protocolFamily() == SocketProtocolFamily.INET6) {
332                     localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
333                 }
334             }
335         }
336         super.doBind(localAddress);
337         active = true;
338     }
339 
340     @Override
341     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
342         int maxMessagesPerWrite = getMaxMessagesPerWrite();
343         while (maxMessagesPerWrite > 0) {
344             Object msg = in.current();
345             if (msg == null) {
346                 // Wrote all messages.
347                 break;
348             }
349 
350             try {
351                 // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
352                 if (Native.IS_SUPPORTING_SENDMMSG && socket.protocolFamily() != SocketProtocolFamily.UNIX &&
353                         in.size() > 1 ||
354                         // We only handle UDP_SEGMENT in sendmmsg.
355                         in.current() instanceof SegmentedDatagramPacket) {
356                     NativeDatagramPacketArray array = cleanDatagramPacketArray();
357                     array.add(in, isConnected(), maxMessagesPerWrite);
358                     int cnt = array.count();
359 
360                     if (cnt >= 1) {
361                         // Try to use gathering writes via sendmmsg(...) syscall.
362                         int offset = 0;
363                         NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
364 
365                         int send = socket.sendmmsg(packets, offset, cnt);
366                         if (send == 0) {
367                             // Did not write all messages.
368                             break;
369                         }
370                         for (int i = 0; i < send; i++) {
371                             in.remove();
372                         }
373                         maxMessagesPerWrite -= send;
374                         continue;
375                     }
376                 }
377                 boolean done = false;
378                 for (int i = getWriteSpinCount(); i > 0; --i) {
379                     if (doWriteMessage(msg)) {
380                         done = true;
381                         break;
382                     }
383                 }
384 
385                 if (done) {
386                     in.remove();
387                     maxMessagesPerWrite --;
388                 } else {
389                     break;
390                 }
391             } catch (IOException e) {
392                 maxMessagesPerWrite --;
393                 // Continue on write error as a DatagramChannel can write to multiple remote peers
394                 //
395                 // See https://github.com/netty/netty/issues/2665
396                 in.remove(e);
397             }
398         }
399 
400         if (in.isEmpty()) {
401             // Did write all messages.
402             clearFlag(Native.EPOLLOUT);
403         } else {
404             // Did not write all messages.
405             setFlag(Native.EPOLLOUT);
406         }
407     }
408 
409     private boolean doWriteMessage(Object msg) throws Exception {
410         final Buffer data;
411         final SocketAddress remoteAddress;
412         if (msg instanceof AddressedEnvelope) {
413             @SuppressWarnings("unchecked")
414             AddressedEnvelope<?, SocketAddress> envelope = (AddressedEnvelope<?, SocketAddress>) msg;
415             data = (Buffer) envelope.content();
416             remoteAddress = envelope.recipient();
417         } else {
418             data = (Buffer) msg;
419             remoteAddress = null;
420         }
421 
422         if (data.readableBytes() == 0) {
423             return true;
424         }
425         return doWriteOrSendBytes(data, remoteAddress, false) > 0;
426     }
427 
428     @Override
429     protected Object filterOutboundMessage(Object msg) {
430         if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
431             return filterOutboundMessage0(msg, DomainSocketAddress.class, EXPECTED_TYPES_DOMAIN_SOCKET);
432         }
433         return filterOutboundMessage0(msg, InetSocketAddress.class, EXPECTED_TYPES);
434     }
435 
436     private Object filterOutboundMessage0(Object msg, Class<? extends SocketAddress> recipientClass,
437                                           String expectedTypes) {
438         if (msg instanceof SegmentedDatagramPacket) {
439             if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
440                 throw new UnsupportedOperationException(
441                         "Unsupported message type: " + StringUtil.simpleClassName(msg) + expectedTypes);
442             }
443             SegmentedDatagramPacket packet = (SegmentedDatagramPacket) msg;
444             if (recipientClass.isInstance(packet.recipient())) {
445                 Buffer content = packet.content();
446                 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
447                         packet.replace(newDirectBuffer(packet, content)) : msg;
448             }
449         } else if (msg instanceof DatagramPacket) {
450             DatagramPacket packet = (DatagramPacket) msg;
451             if (recipientClass.isInstance(packet.recipient())) {
452                 Buffer content = packet.content();
453                 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
454                         new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
455             }
456         } else if (msg instanceof Buffer) {
457             Buffer buf = (Buffer) msg;
458             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
459         } else if (msg instanceof AddressedEnvelope) {
460             @SuppressWarnings("unchecked")
461             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
462             if (recipientClass.isInstance(e.recipient())) {
463                 InetSocketAddress recipient = (InetSocketAddress) e.recipient();
464                 Object content = e.content();
465                 if (content instanceof Buffer) {
466                     Buffer buf = (Buffer) content;
467                     if (UnixChannelUtil.isBufferCopyNeededForWrite(buf)) {
468                         try {
469                             return new DefaultBufferAddressedEnvelope<>(newDirectBuffer(buf), recipient);
470                         } finally {
471                             SilentDispose.dispose(e, logger); // Don't fail here, because we allocated a buffer.
472                         }
473                     }
474                     return e;
475                 }
476             }
477         }
478 
479         throw new UnsupportedOperationException(
480                 "unsupported message type: " + StringUtil.simpleClassName(msg) + expectedTypes);
481     }
482 
483     @Override
484     protected void doDisconnect() throws Exception {
485         socket.disconnect();
486         connected = active = false;
487         resetCachedAddresses();
488     }
489 
490     @Override
491     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
492         if (super.doConnect(remoteAddress, localAddress)) {
493             connected = true;
494             return true;
495         }
496         return false;
497     }
498 
499     @Override
500     protected void doClose() throws Exception {
501         super.doClose();
502         connected = false;
503     }
504 
505     @Override
506     protected void epollInReady(RecvBufferAllocator.Handle handle, BufferAllocator recvBufferAllocator,
507                                 boolean receivedRdHup) {
508         final ChannelPipeline pipeline = pipeline();
509         Throwable exception = socket.protocolFamily() == SocketProtocolFamily.UNIX ?
510                 doReadBufferDomainSocket(handle, recvBufferAllocator) : doReadBuffer(handle, recvBufferAllocator);
511         handle.readComplete();
512         pipeline.fireChannelReadComplete();
513 
514         if (exception != null) {
515             pipeline.fireChannelExceptionCaught(exception);
516         }
517         readIfIsAutoRead();
518     }
519 
520     @Override
521     protected boolean maybeMoreDataToRead(RecvBufferAllocator.Handle handle) {
522         return handle.lastBytesRead() > 0;
523     }
524 
525     private Throwable doReadBufferDomainSocket(RecvBufferAllocator.Handle allocHandle,
526                                                BufferAllocator allocator) {
527         Buffer buf = null;
528         try {
529             boolean connected = isConnected();
530             do {
531                 buf = allocHandle.allocate(allocator);
532                 allocHandle.attemptedBytesRead(buf.writableBytes());
533 
534                 final DatagramPacket packet;
535                 if (connected) {
536                     doReadBytes(buf);
537                     if (allocHandle.lastBytesRead() <= 0) {
538                         // nothing was read, release the buffer.
539                         buf.close();
540                         break;
541                     }
542                     packet = new DatagramPacket(buf, localAddress(), remoteAddress());
543                 } else {
544                     final RecvFromAddressDomainSocket recvFrom = new RecvFromAddressDomainSocket(socket);
545                     buf.forEachWritable(0, recvFrom);
546                     final DomainDatagramSocketAddress remoteAddress = recvFrom.remoteAddress();
547 
548                     if (remoteAddress == null) {
549                         allocHandle.lastBytesRead(-1);
550                         buf.close();
551                         break;
552                     }
553                     DomainSocketAddress localAddress = remoteAddress.localAddress();
554                     if (localAddress == null) {
555                         localAddress = (DomainSocketAddress) localAddress();
556                     }
557                     allocHandle.lastBytesRead(remoteAddress.receivedAmount());
558                     buf.skipWritableBytes(allocHandle.lastBytesRead());
559 
560                     packet = new DatagramPacket(buf, localAddress, remoteAddress);
561                 }
562 
563                 allocHandle.incMessagesRead(1);
564 
565                 readPending = false;
566                 pipeline().fireChannelRead(packet);
567 
568                 buf = null;
569 
570                 // We use the TRUE_SUPPLIER as it is also ok to read less than what we did try to read (as long
571                 // as we read anything).
572             } while (allocHandle.continueReading(isAutoRead(), TRUE_SUPPLIER));
573         } catch (Throwable t) {
574             if (buf != null) {
575                 buf.close();
576             }
577             return t;
578         }
579         return null;
580     }
581 
582     private Throwable doReadBuffer(RecvBufferAllocator.Handle allocHandle, BufferAllocator allocator) {
583         try {
584             boolean connected = isConnected();
585             do {
586                 final boolean read;
587                 int datagramSize = getMaxDatagramPayloadSize();
588 
589                 Buffer buf = allocHandle.allocate(allocator);
590                 // Only try to use recvmmsg if its really supported by the running system.
591                 int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
592                         datagramSize == 0 ? 1 : buf.writableBytes() / datagramSize :
593                         0;
594                 try {
595                     if (numDatagram <= 1) {
596                         if (!connected || isUdpGro()) {
597                             read = recvmsg(allocHandle, allocator, cleanDatagramPacketArray(), buf);
598                         } else {
599                             read = connectedRead(allocHandle, buf, datagramSize);
600                         }
601                     } else {
602                         // Try to use scattering reads via recvmmsg(...) syscall.
603                         read = scatteringRead(allocHandle, allocator, cleanDatagramPacketArray(),
604                                               buf, datagramSize, numDatagram);
605                     }
606                 } catch (NativeIoException e) {
607                     if (connected) {
608                         throw translateForConnected(e);
609                     }
610                     throw e;
611                 }
612 
613                 if (read) {
614                     readPending = false;
615                 } else {
616                     break;
617                 }
618             // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
619             // as we read anything).
620             } while (allocHandle.continueReading(isAutoRead(), TRUE_SUPPLIER));
621         } catch (Throwable t) {
622             return t;
623         }
624         return null;
625     }
626 
627     private boolean connectedRead(RecvBufferAllocator.Handle allocHandle, Buffer buf,
628                                   int maxDatagramPacketSize) throws Exception {
629         try {
630             int writable = maxDatagramPacketSize != 0 ? Math.min(buf.writableBytes(), maxDatagramPacketSize)
631                     : buf.writableBytes();
632             allocHandle.attemptedBytesRead(writable);
633 
634             int initialWritableBytes = buf.writableBytes();
635             buf.forEachWritable(0, (index, component) -> {
636                 long address = component.writableNativeAddress();
637                 assert address != 0;
638                 int bytesRead = socket.readAddress(address, 0, component.writableBytes());
639                 allocHandle.lastBytesRead(bytesRead);
640                 if (bytesRead <= 0) {
641                     return false;
642                 }
643                 component.skipWritableBytes(bytesRead);
644                 return true;
645             });
646             final int totalBytesRead = initialWritableBytes - buf.writableBytes();
647             if (totalBytesRead == 0) {
648                 // nothing was read, release the buffer.
649                 return false;
650             }
651             if (maxDatagramPacketSize > 0) {
652                 allocHandle.lastBytesRead(totalBytesRead);
653             }
654             DatagramPacket packet = new DatagramPacket(buf, localAddress(), remoteAddress());
655             allocHandle.incMessagesRead(1);
656 
657             pipeline().fireChannelRead(packet);
658             buf = null;
659             return true;
660         } finally {
661             if (buf != null) {
662                 buf.close();
663             }
664         }
665     }
666 
667     private IOException translateForConnected(NativeIoException e) {
668         // We need to correctly translate connect errors to match NIO behaviour.
669         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
670             PortUnreachableException error = new PortUnreachableException(e.getMessage());
671             error.initCause(e);
672             return error;
673         }
674         return e;
675     }
676 
677     private static void addDatagramPacketToOut(AddressedEnvelope<?, ?> packet, RecyclableArrayList out) {
678         if (packet instanceof SegmentedDatagramPacket) {
679             try (SegmentedDatagramPacket segmentedDatagramPacket = (SegmentedDatagramPacket) packet) {
680                 Buffer content = segmentedDatagramPacket.content();
681                 SocketAddress recipient = segmentedDatagramPacket.recipient();
682                 SocketAddress sender = segmentedDatagramPacket.sender();
683                 int segmentSize = segmentedDatagramPacket.segmentSize();
684                 do {
685                     out.add(new DatagramPacket(content.readSplit(segmentSize), recipient, sender));
686                 } while (content.readableBytes() > 0);
687             }
688         } else {
689             out.add(packet);
690         }
691     }
692 
693     private static void releaseAndRecycle(Object buffer, RecyclableArrayList packetList) {
694         Resource.dispose(buffer);
695         if (packetList != null) {
696             for (int i = 0; i < packetList.size(); i++) {
697                 Resource.dispose(packetList.get(i));
698             }
699             packetList.recycle();
700         }
701     }
702 
703     private static void processPacket(ChannelPipeline pipeline, RecvBufferAllocator.Handle handle,
704                                       int bytesRead, AddressedEnvelope<?, ?> packet) {
705         handle.lastBytesRead(bytesRead);
706         handle.incMessagesRead(1);
707         pipeline.fireChannelRead(packet);
708     }
709 
710     private static void processPacketList(ChannelPipeline pipeline, RecvBufferAllocator.Handle handle,
711                                           BufferAllocator allocator, int bytesRead, RecyclableArrayList packetList) {
712         int messagesRead = packetList.size();
713         handle.lastBytesRead(bytesRead);
714         handle.incMessagesRead(messagesRead);
715         for (int i = 0; i < messagesRead; i++) {
716             pipeline.fireChannelRead(packetList.set(i, allocator.allocate(0)));
717         }
718     }
719 
720     private boolean recvmsg(RecvBufferAllocator.Handle allocHandle, BufferAllocator allocator,
721                             NativeDatagramPacketArray array, Buffer buf) throws IOException {
722         RecyclableArrayList datagramPackets = null;
723         try {
724             int initialWriterOffset = buf.writerOffset();
725 
726             boolean added = array.addWritable(buf, 0, null);
727             assert added;
728 
729             allocHandle.attemptedBytesRead(buf.writerOffset() - initialWriterOffset);
730 
731             NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
732 
733             int bytesReceived = socket.recvmsg(msg);
734             if (bytesReceived == 0) {
735                 allocHandle.lastBytesRead(-1);
736                 return false;
737             }
738             buf.writerOffset(initialWriterOffset + bytesReceived);
739             InetSocketAddress local = (InetSocketAddress) localAddress();
740             DatagramPacket packet = msg.newDatagramPacket(buf, local);
741             if (!(packet instanceof SegmentedDatagramPacket)) {
742                 processPacket(pipeline(), allocHandle, bytesReceived, packet);
743                 buf = null;
744             } else {
745                 // Its important we process all received data out of the NativeDatagramPacketArray
746                 // before we call fireChannelRead(...). This is because the user may call flush()
747                 // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
748                 datagramPackets = RecyclableArrayList.newInstance();
749                 addDatagramPacketToOut(packet, datagramPackets);
750                 // null out buf as addDatagramPacketToOut did take ownership of the Buffer / packet and transferred
751                 // it into the RecyclableArrayList.
752                 buf = null;
753 
754                 processPacketList(pipeline(), allocHandle, allocator, bytesReceived, datagramPackets);
755                 datagramPackets.recycle();
756                 datagramPackets = null;
757             }
758 
759             return true;
760         } finally {
761             releaseAndRecycle(buf, datagramPackets);
762         }
763     }
764 
765     private boolean scatteringRead(RecvBufferAllocator.Handle allocHandle, BufferAllocator allocator,
766                                    NativeDatagramPacketArray array, Buffer buf, int datagramSize, int numDatagram)
767             throws IOException {
768         RecyclableArrayList datagramPackets = null;
769         try {
770             int initialWriterOffset = buf.writerOffset();
771             for (int i = 0; i < numDatagram; i++) {
772                 if (!array.addWritable(buf, datagramSize, null)) {
773                     break;
774                 }
775             }
776 
777             allocHandle.attemptedBytesRead(buf.writerOffset() - initialWriterOffset);
778 
779             NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
780 
781             int received = socket.recvmmsg(packets, 0, array.count());
782             if (received == 0) {
783                 allocHandle.lastBytesRead(-1);
784                 return false;
785             }
786             int bytesReceived = received * datagramSize;
787             buf.writerOffset(initialWriterOffset + bytesReceived);
788             InetSocketAddress local = (InetSocketAddress) localAddress();
789             if (received == 1) {
790                 // Single packet fast-path
791                 DatagramPacket packet = packets[0].newDatagramPacket(buf, local);
792                 if (!(packet instanceof SegmentedDatagramPacket)) {
793                     processPacket(pipeline(), allocHandle, datagramSize, packet);
794                     buf = null;
795                     return true;
796                 }
797             }
798             // It's important we process all received data out of the NativeDatagramPacketArray
799             // before we call fireChannelRead(...). This is because the user may call flush()
800             // in a channelRead(...) method and so may re-use the NativeDatagramPacketArray again.
801             datagramPackets = RecyclableArrayList.newInstance();
802             for (int i = 0; i < received; i++) {
803                 DatagramPacket packet = packets[i].newDatagramPacket(buf.readSplit(datagramSize), local);
804                 addDatagramPacketToOut(packet, datagramPackets);
805             }
806             // Since we used readSplit(...) before, we should now release the buffer and null it out.
807             buf.close();
808             buf = null;
809 
810             processPacketList(pipeline(), allocHandle, allocator, bytesReceived, datagramPackets);
811             datagramPackets.recycle();
812             datagramPackets = null;
813             return true;
814         } finally {
815             releaseAndRecycle(buf, datagramPackets);
816         }
817     }
818 
819     private NativeDatagramPacketArray cleanDatagramPacketArray() {
820         return registration().cleanDatagramPacketArray();
821     }
822 
823     @SuppressWarnings("unchecked")
824     @Override
825     protected <T> T getExtendedOption(ChannelOption<T> option) {
826         if (isOptionSupported(socket.protocolFamily(), option)) {
827             if (option == ChannelOption.SO_BROADCAST) {
828                 return (T) Boolean.valueOf(isBroadcast());
829             }
830             if (option == ChannelOption.SO_RCVBUF) {
831                 return (T) Integer.valueOf(getReceiveBufferSize());
832             }
833             if (option == ChannelOption.SO_SNDBUF) {
834                 return (T) Integer.valueOf(getSendBufferSize());
835             }
836             if (option == ChannelOption.SO_REUSEADDR) {
837                 return (T) Boolean.valueOf(isReuseAddress());
838             }
839             if (option == ChannelOption.IP_MULTICAST_LOOP_DISABLED) {
840                 return (T) Boolean.valueOf(isLoopbackModeDisabled());
841             }
842             if (option == ChannelOption.IP_MULTICAST_IF) {
843                 return (T) getNetworkInterface();
844             }
845             if (option == ChannelOption.IP_MULTICAST_TTL) {
846                 return (T) Integer.valueOf(getTimeToLive());
847             }
848             if (option == ChannelOption.IP_TOS) {
849                 return (T) Integer.valueOf(getTrafficClass());
850             }
851             if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
852                 return (T) Boolean.valueOf(activeOnOpen);
853             }
854             if (option == UnixChannelOption.SO_REUSEPORT) {
855                 return (T) Boolean.valueOf(isReusePort());
856             }
857             if (option == EpollChannelOption.IP_TRANSPARENT) {
858                 return (T) Boolean.valueOf(isIpTransparent());
859             }
860             if (option == EpollChannelOption.IP_FREEBIND) {
861                 return (T) Boolean.valueOf(isFreeBind());
862             }
863             if (option == EpollChannelOption.IP_RECVORIGDSTADDR) {
864                 return (T) Boolean.valueOf(isIpRecvOrigDestAddr());
865             }
866             if (option == EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) {
867                 return (T) Integer.valueOf(getMaxDatagramPayloadSize());
868             }
869             if (option == EpollChannelOption.UDP_GRO) {
870                 return (T) Boolean.valueOf(isUdpGro());
871             }
872         }
873         return super.getExtendedOption(option);
874     }
875 
876     @Override
877     protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
878         if (isOptionSupported(socket.protocolFamily(), option)) {
879             if (option == ChannelOption.SO_BROADCAST) {
880                 setBroadcast((Boolean) value);
881             } else if (option == ChannelOption.SO_RCVBUF) {
882                 setReceiveBufferSize((Integer) value);
883             } else if (option == ChannelOption.SO_SNDBUF) {
884                 setSendBufferSize((Integer) value);
885             } else if (option == ChannelOption.SO_REUSEADDR) {
886                 setReuseAddress((Boolean) value);
887             } else if (option == ChannelOption.IP_MULTICAST_LOOP_DISABLED) {
888                 setLoopbackModeDisabled((Boolean) value);
889             } else if (option == ChannelOption.IP_MULTICAST_IF) {
890                 setNetworkInterface((NetworkInterface) value);
891             } else if (option == ChannelOption.IP_MULTICAST_TTL) {
892                 setTimeToLive((Integer) value);
893             } else if (option == ChannelOption.IP_TOS) {
894                 setTrafficClass((Integer) value);
895             } else if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
896                 setActiveOnOpen((Boolean) value);
897             } else if (option == UnixChannelOption.SO_REUSEPORT) {
898                 setReusePort((Boolean) value);
899             } else if (option == EpollChannelOption.IP_FREEBIND) {
900                 setFreeBind((Boolean) value);
901             } else if (option == EpollChannelOption.IP_TRANSPARENT) {
902                 setIpTransparent((Boolean) value);
903             } else if (option == EpollChannelOption.IP_RECVORIGDSTADDR) {
904                 setIpRecvOrigDestAddr((Boolean) value);
905             } else if (option == EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) {
906                 setMaxDatagramPayloadSize((Integer) value);
907             } else if (option == EpollChannelOption.UDP_GRO) {
908                 setUdpGro((Boolean) value);
909             }
910         } else {
911             super.setExtendedOption(option, value);
912         }
913     }
914 
915     private static Set<ChannelOption<?>> supportedOptions() {
916         return newSupportedIdentityOptionsSet(
917                 ChannelOption.SO_BROADCAST, ChannelOption.SO_RCVBUF, ChannelOption.SO_SNDBUF,
918                 ChannelOption.SO_REUSEADDR, ChannelOption.IP_MULTICAST_LOOP_DISABLED,
919                 ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL, ChannelOption.IP_TOS,
920                 ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, UnixChannelOption.SO_REUSEPORT,
921                 EpollChannelOption.IP_FREEBIND, EpollChannelOption.IP_TRANSPARENT,
922                 EpollChannelOption.IP_RECVORIGDSTADDR, EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE,
923                 EpollChannelOption.UDP_GRO);
924     }
925 
926     private static Set<ChannelOption<?>> supportedOptionsDomainSocket() {
927         return newSupportedIdentityOptionsSet(ChannelOption.SO_RCVBUF, ChannelOption.SO_SNDBUF,
928                 ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION);
929     }
930 
931     private static boolean isOptionSupported(SocketProtocolFamily family, ChannelOption<?> option) {
932         if (family == SocketProtocolFamily.UNIX) {
933             return SUPPORTED_OPTIONS_DOMAIN_SOCKET.contains(option);
934         }
935         return SUPPORTED_OPTIONS.contains(option);
936     }
937 
938     @Override
939     protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
940         return isOptionSupported(socket.protocolFamily(), option) || super.isExtendedOptionSupported(option);
941     }
942 
943     private void setActiveOnOpen(boolean activeOnOpen) {
944         if (isRegistered()) {
945             throw new IllegalStateException("Can only changed before channel was registered");
946         }
947         this.activeOnOpen = activeOnOpen;
948     }
949 
950     boolean getActiveOnOpen() {
951         return activeOnOpen;
952     }
953 
954     private int getSendBufferSize() {
955         try {
956             return socket.getSendBufferSize();
957         } catch (IOException e) {
958             throw new ChannelException(e);
959         }
960     }
961 
962     private void setSendBufferSize(int sendBufferSize) {
963         try {
964             socket.setSendBufferSize(sendBufferSize);
965         } catch (IOException e) {
966             throw new ChannelException(e);
967         }
968     }
969 
970     private int getReceiveBufferSize() {
971         try {
972             return socket.getReceiveBufferSize();
973         } catch (IOException e) {
974             throw new ChannelException(e);
975         }
976     }
977 
978     private void setReceiveBufferSize(int receiveBufferSize) {
979         try {
980             socket.setReceiveBufferSize(receiveBufferSize);
981         } catch (IOException e) {
982             throw new ChannelException(e);
983         }
984     }
985 
986     private int getTrafficClass() {
987         try {
988             return socket.getTrafficClass();
989         } catch (IOException e) {
990             throw new ChannelException(e);
991         }
992     }
993 
994     private void setTrafficClass(int trafficClass) {
995         try {
996             socket.setTrafficClass(trafficClass);
997         } catch (IOException e) {
998             throw new ChannelException(e);
999         }
1000     }
1001 
1002     private boolean isReuseAddress() {
1003         try {
1004             return socket.isReuseAddress();
1005         } catch (IOException e) {
1006             throw new ChannelException(e);
1007         }
1008     }
1009 
1010     private void setReuseAddress(boolean reuseAddress) {
1011         try {
1012             socket.setReuseAddress(reuseAddress);
1013         } catch (IOException e) {
1014             throw new ChannelException(e);
1015         }
1016     }
1017 
1018     private boolean isBroadcast() {
1019         try {
1020             return socket.isBroadcast();
1021         } catch (IOException e) {
1022             throw new ChannelException(e);
1023         }
1024     }
1025 
1026     private void setBroadcast(boolean broadcast) {
1027         try {
1028             socket.setBroadcast(broadcast);
1029         } catch (IOException e) {
1030             throw new ChannelException(e);
1031         }
1032     }
1033 
1034     private boolean isLoopbackModeDisabled() {
1035         try {
1036             return socket.isLoopbackModeDisabled();
1037         } catch (IOException e) {
1038             throw new ChannelException(e);
1039         }
1040     }
1041 
1042     private void setLoopbackModeDisabled(boolean loopbackModeDisabled) {
1043         try {
1044             socket.setLoopbackModeDisabled(loopbackModeDisabled);
1045         } catch (IOException e) {
1046             throw new ChannelException(e);
1047         }
1048     }
1049 
1050     private int getTimeToLive() {
1051         try {
1052             return socket.getTimeToLive();
1053         } catch (IOException e) {
1054             throw new ChannelException(e);
1055         }
1056     }
1057 
1058     private void setTimeToLive(int ttl) {
1059         try {
1060             socket.setTimeToLive(ttl);
1061         } catch (IOException e) {
1062             throw new ChannelException(e);
1063         }
1064     }
1065 
1066     private NetworkInterface getNetworkInterface() {
1067         try {
1068             return socket.getNetworkInterface();
1069         } catch (IOException e) {
1070             throw new ChannelException(e);
1071         }
1072     }
1073 
1074     private void setNetworkInterface(NetworkInterface networkInterface) {
1075         try {
1076             socket.setNetworkInterface(networkInterface);
1077         } catch (IOException e) {
1078             throw new ChannelException(e);
1079         }
1080     }
1081 
1082     /**
1083      * Returns {@code true} if the SO_REUSEPORT option is set.
1084      */
1085     private boolean isReusePort() {
1086         try {
1087             return socket.isReusePort();
1088         } catch (IOException e) {
1089             throw new ChannelException(e);
1090         }
1091     }
1092 
1093     /**
1094      * Set the SO_REUSEPORT option on the underlying Channel. This will allow to bind multiple
1095      * {@link EpollSocketChannel}s to the same port and so accept connections with multiple threads.
1096      *
1097      * Be aware this method needs be called before {@link EpollDatagramChannel#bind(java.net.SocketAddress)} to have
1098      * any affect.
1099      */
1100     private void setReusePort(boolean reusePort) {
1101         try {
1102             socket.setReusePort(reusePort);
1103         } catch (IOException e) {
1104             throw new ChannelException(e);
1105         }
1106     }
1107 
1108     /**
1109      * Returns {@code true} if <a href="https://man7.org/linux/man-pages/man7/ip.7.html">IP_TRANSPARENT</a> is enabled,
1110      * {@code false} otherwise.
1111      */
1112     private boolean isIpTransparent() {
1113         try {
1114             return socket.isIpTransparent();
1115         } catch (IOException e) {
1116             throw new ChannelException(e);
1117         }
1118     }
1119 
1120     /**
1121      * If {@code true} is used <a href="https://man7.org/linux/man-pages/man7/ip.7.html">IP_TRANSPARENT</a> is enabled,
1122      * {@code false} for disable it. Default is disabled.
1123      */
1124     private void setIpTransparent(boolean ipTransparent) {
1125         try {
1126             socket.setIpTransparent(ipTransparent);
1127         } catch (IOException e) {
1128             throw new ChannelException(e);
1129         }
1130     }
1131 
1132     /**
1133      * Returns {@code true} if <a href="https://man7.org/linux/man-pages/man7/ip.7.html">IP_FREEBIND</a> is enabled,
1134      * {@code false} otherwise.
1135      */
1136     private boolean isFreeBind() {
1137         try {
1138             return socket.isIpFreeBind();
1139         } catch (IOException e) {
1140             throw new ChannelException(e);
1141         }
1142     }
1143 
1144     /**
1145      * If {@code true} is used <a href="https://man7.org/linux/man-pages/man7/ip.7.html">IP_FREEBIND</a> is enabled,
1146      * {@code false} for disable it. Default is disabled.
1147      */
1148     private void setFreeBind(boolean freeBind) {
1149         try {
1150             socket.setIpFreeBind(freeBind);
1151         } catch (IOException e) {
1152             throw new ChannelException(e);
1153         }
1154     }
1155 
1156     /**
1157      * Returns {@code true} if <a href="https://man7.org/linux/man-pages/man7/ip.7.html">IP_RECVORIGDSTADDR</a> is
1158      * enabled, {@code false} otherwise.
1159      */
1160     private boolean isIpRecvOrigDestAddr() {
1161         try {
1162             return socket.isIpRecvOrigDestAddr();
1163         } catch (IOException e) {
1164             throw new ChannelException(e);
1165         }
1166     }
1167 
1168     /**
1169      * If {@code true} is used <a href="https://man7.org/linux/man-pages/man7/ip.7.html">IP_RECVORIGDSTADDR</a> is
1170      * enabled, {@code false} for disable it. Default is disabled.
1171      */
1172     private void setIpRecvOrigDestAddr(boolean ipTransparent) {
1173         try {
1174             socket.setIpRecvOrigDestAddr(ipTransparent);
1175         } catch (IOException e) {
1176             throw new ChannelException(e);
1177         }
1178     }
1179 
1180     /**
1181      * Set the maximum {@link io.netty5.channel.socket.DatagramPacket} size. This will be used to determine if
1182      * {@code recvmmsg} should be used when reading from the underlying socket. When {@code recvmmsg} is used
1183      * we may be able to read multiple {@link io.netty5.channel.socket.DatagramPacket}s with one syscall and so
1184      * greatly improve the performance. This number will be used to split {@link Buffer}s returned by the used
1185      * {@link RecvBufferAllocator}. You can use {@code 0} to disable the usage of recvmmsg, any other bigger value
1186      * will enable it.
1187      */
1188     private void setMaxDatagramPayloadSize(int maxDatagramSize) {
1189         this.maxDatagramSize = ObjectUtil.checkPositiveOrZero(maxDatagramSize, "maxDatagramSize");
1190     }
1191 
1192     /**
1193      * Get the maximum {@link io.netty5.channel.socket.DatagramPacket} size.
1194      */
1195     private int getMaxDatagramPayloadSize() {
1196         return maxDatagramSize;
1197     }
1198 
1199     /**
1200      * Enable / disable <a href="https://lwn.net/Articles/768995/">UDP_GRO</a>.
1201      * @param gro {@code true} if {@code UDP_GRO} should be enabled, {@code false} otherwise.
1202      */
1203     private void setUdpGro(boolean gro) {
1204         try {
1205             socket.setUdpGro(gro);
1206         } catch (IOException e) {
1207             throw new ChannelException(e);
1208         }
1209         this.gro = gro;
1210     }
1211 
1212     /**
1213      * Returns if {@code UDP_GRO} is enabled.
1214      * @return {@code true} if enabled, {@code false} otherwise.
1215      */
1216     private boolean isUdpGro() {
1217         // We don't do a syscall here but just return the cached value due a kernel bug:
1218         // https://lore.kernel.org/netdev/[email protected]/T/#u
1219         return gro;
1220     }
1221 }