View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.ChannelException;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultAddressedEnvelope;
27  import io.netty.channel.IoRegistration;
28  import io.netty.channel.socket.DatagramChannel;
29  import io.netty.channel.socket.DatagramChannelConfig;
30  import io.netty.channel.socket.DatagramPacket;
31  import io.netty.channel.socket.SocketProtocolFamily;
32  import io.netty.channel.unix.Errors;
33  import io.netty.channel.unix.Errors.NativeIoException;
34  import io.netty.channel.unix.SegmentedDatagramPacket;
35  import io.netty.channel.unix.Socket;
36  import io.netty.util.UncheckedBooleanSupplier;
37  import io.netty.util.internal.ObjectUtil;
38  import io.netty.util.internal.StringUtil;
39  import io.netty.util.internal.SystemPropertyUtil;
40  import io.netty.util.internal.logging.InternalLogger;
41  import io.netty.util.internal.logging.InternalLoggerFactory;
42  
43  import java.io.IOException;
44  import java.net.Inet4Address;
45  import java.net.InetAddress;
46  import java.net.InetSocketAddress;
47  import java.net.NetworkInterface;
48  import java.net.PortUnreachableException;
49  import java.net.SocketAddress;
50  import java.nio.channels.UnresolvedAddressException;
51  
52  import static io.netty.channel.unix.Errors.ioResult;
53  
54  public final class IoUringDatagramChannel extends AbstractIoUringChannel implements DatagramChannel {
55      private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringDatagramChannel.class);
56      private static final boolean IP_MULTICAST_ALL =
57              SystemPropertyUtil.getBoolean("io.netty.channel.iouring.ipMulticastAll", false);
58      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
59      private static final String EXPECTED_TYPES =
60              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
61              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
62              StringUtil.simpleClassName(ByteBuf.class) + ", " +
63              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
64              StringUtil.simpleClassName(ByteBuf.class) + ')';
65  
66      private final IoUringDatagramChannelConfig config;
67      private volatile boolean connected;
68  
69      static {
70          if (logger.isDebugEnabled()) {
71              logger.debug("-Dio.netty.channel.iouring.ipMulticastAll: {}", IP_MULTICAST_ALL);
72          }
73      }
74  
75      // These buffers are used for msghdr, iov, sockaddr_in / sockaddr_in6 when doing recvmsg / sendmsg
76      //
77      // TODO: Alternative we could also allocate these everytime from the ByteBufAllocator or we could use
78      //       some sort of other pool. Let's keep it simple for now.
79      //
80      // Consider exposing some configuration for that.
81      private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray((short) 256);
82      private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray((short) 256);
83      private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
84  
85      /**
86       * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
87       * on the Operation Systems default which will be chosen.
88       */
89      public IoUringDatagramChannel() {
90          this(null);
91      }
92  
93      /**
94       * Create a new instance using the given {@link SocketProtocolFamily}. If {@code null} is used it will depend
95       * on the Operation Systems default which will be chosen.
96       */
97      public IoUringDatagramChannel(SocketProtocolFamily family) {
98          this(LinuxSocket.newSocketDgram(useIpv6(family)), false);
99      }
100 
101     private static boolean useIpv6(SocketProtocolFamily family) {
102         if (family == null) {
103             return Socket.isIPv6Preferred();
104         }
105         return family == SocketProtocolFamily.INET6;
106     }
107 
108     /**
109      * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
110      * on the Operation Systems default which will be chosen.
111      */
112     public IoUringDatagramChannel(int fd) {
113         this(new LinuxSocket(fd), true);
114     }
115 
116     private IoUringDatagramChannel(LinuxSocket fd, boolean active) {
117         // Always use a blocking fd and so make use of fast-poll.
118         super(null, fd, active);
119 
120         // Configure IP_MULTICAST_ALL - disable by default to match the behaviour of NIO.
121         try {
122             fd.setIpMulticastAll(IP_MULTICAST_ALL);
123         } catch (IOException | ChannelException e) {
124             logger.debug("Failed to set IP_MULTICAST_ALL to {}", IP_MULTICAST_ALL, e);
125         }
126 
127         config = new IoUringDatagramChannelConfig(this);
128     }
129 
130     @Override
131     public InetSocketAddress remoteAddress() {
132         return (InetSocketAddress) super.remoteAddress();
133     }
134 
135     @Override
136     public InetSocketAddress localAddress() {
137         return (InetSocketAddress) super.localAddress();
138     }
139 
140     @Override
141     public ChannelMetadata metadata() {
142         return METADATA;
143     }
144 
145     @Override
146     public boolean isActive() {
147         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || super.isActive());
148     }
149 
150     @Override
151     public boolean isConnected() {
152         return connected;
153     }
154 
155     @Override
156     public ChannelFuture joinGroup(InetAddress multicastAddress) {
157         return joinGroup(multicastAddress, newPromise());
158     }
159 
160     @Override
161     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
162         try {
163             return joinGroup(
164                     multicastAddress,
165                     NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
166         } catch (IOException e) {
167             promise.setFailure(e);
168         }
169         return promise;
170     }
171 
172     @Override
173     public ChannelFuture joinGroup(
174             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
175         return joinGroup(multicastAddress, networkInterface, newPromise());
176     }
177 
178     @Override
179     public ChannelFuture joinGroup(
180             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
181             ChannelPromise promise) {
182         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
183     }
184 
185     @Override
186     public ChannelFuture joinGroup(
187             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
188         return joinGroup(multicastAddress, networkInterface, source, newPromise());
189     }
190 
191     @Override
192     public ChannelFuture joinGroup(
193             final InetAddress multicastAddress, final NetworkInterface networkInterface,
194             final InetAddress source, final ChannelPromise promise) {
195 
196         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
197         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
198 
199         try {
200             socket.joinGroup(multicastAddress, networkInterface, source);
201             promise.setSuccess();
202         } catch (IOException e) {
203             promise.setFailure(e);
204         }
205         return promise;
206     }
207 
208     @Override
209     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
210         return leaveGroup(multicastAddress, newPromise());
211     }
212 
213     @Override
214     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
215         try {
216             return leaveGroup(
217                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
218         } catch (IOException e) {
219             promise.setFailure(e);
220         }
221         return promise;
222     }
223 
224     @Override
225     public ChannelFuture leaveGroup(
226             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
227         return leaveGroup(multicastAddress, networkInterface, newPromise());
228     }
229 
230     @Override
231     public ChannelFuture leaveGroup(
232             InetSocketAddress multicastAddress,
233             NetworkInterface networkInterface, ChannelPromise promise) {
234         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
235     }
236 
237     @Override
238     public ChannelFuture leaveGroup(
239             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
240         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
241     }
242 
243     @Override
244     public ChannelFuture leaveGroup(
245             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
246             final ChannelPromise promise) {
247         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
248         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
249 
250         try {
251             socket.leaveGroup(multicastAddress, networkInterface, source);
252             promise.setSuccess();
253         } catch (IOException e) {
254             promise.setFailure(e);
255         }
256         return promise;
257     }
258 
259     @Override
260     public ChannelFuture block(
261             InetAddress multicastAddress, NetworkInterface networkInterface,
262             InetAddress sourceToBlock) {
263         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
264     }
265 
266     @Override
267     public ChannelFuture block(
268             final InetAddress multicastAddress, final NetworkInterface networkInterface,
269             final InetAddress sourceToBlock, final ChannelPromise promise) {
270         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
271         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
272         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
273 
274         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
275         return promise;
276     }
277 
278     @Override
279     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
280         return block(multicastAddress, sourceToBlock, newPromise());
281     }
282 
283     @Override
284     public ChannelFuture block(
285             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
286         try {
287             return block(
288                     multicastAddress,
289                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
290                     sourceToBlock, promise);
291         } catch (Throwable e) {
292             promise.setFailure(e);
293         }
294         return promise;
295     }
296 
297     @Override
298     protected AbstractUnsafe newUnsafe() {
299         return new IoUringDatagramChannelUnsafe();
300     }
301 
302     @Override
303     protected void doBind(SocketAddress localAddress) throws Exception {
304         if (localAddress instanceof InetSocketAddress) {
305             InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
306             if (socketAddress.getAddress().isAnyLocalAddress() &&
307                     socketAddress.getAddress() instanceof Inet4Address) {
308                 if (socket.family() == SocketProtocolFamily.INET6) {
309                     localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
310                 }
311             }
312         }
313         super.doBind(localAddress);
314         active = true;
315     }
316 
317     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
318         if (envelope.recipient() instanceof InetSocketAddress
319                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
320             throw new UnresolvedAddressException();
321         }
322     }
323 
324     @Override
325     protected Object filterOutboundMessage(Object msg) {
326         if (msg instanceof DatagramPacket) {
327             DatagramPacket packet = (DatagramPacket) msg;
328             checkUnresolved(packet);
329             ByteBuf content = packet.content();
330             return !content.hasMemoryAddress() ?
331                     packet.replace(newDirectBuffer(packet, content)) : msg;
332         }
333 
334         if (msg instanceof ByteBuf) {
335             ByteBuf buf = (ByteBuf) msg;
336             return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
337         }
338 
339         if (msg instanceof AddressedEnvelope) {
340             @SuppressWarnings("unchecked")
341             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
342             checkUnresolved(e);
343             if (e.content() instanceof ByteBuf &&
344                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
345 
346                 ByteBuf content = (ByteBuf) e.content();
347                 return !content.hasMemoryAddress()?
348                         new DefaultAddressedEnvelope<>(
349                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
350             }
351         }
352 
353         throw new UnsupportedOperationException(
354                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
355     }
356 
357     @Override
358     public DatagramChannelConfig config() {
359         return config;
360     }
361 
362     @Override
363     protected void doDisconnect() throws Exception {
364         // TODO: use io_uring for this too...
365         socket.disconnect();
366         connected = active = false;
367 
368         resetCachedAddresses();
369     }
370 
371     @Override
372     protected void doClose() throws Exception {
373         super.doClose();
374         connected = false;
375     }
376 
377     private final class IoUringDatagramChannelUnsafe extends AbstractUringUnsafe {
378         private final WriteProcessor writeProcessor = new WriteProcessor();
379 
380         private ByteBuf readBuffer;
381 
382         private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
383             private int written;
384             @Override
385             public boolean processMessage(Object msg) {
386                 if (scheduleWrite(msg, written == 0)) {
387                     written++;
388                     return true;
389                 }
390                 return false;
391             }
392 
393             int write(ChannelOutboundBuffer in) {
394                 written = 0;
395                 try {
396                     in.forEachFlushedMessage(this);
397                 } catch (Exception e) {
398                     // This should never happen as our processMessage(...) never throws.
399                     throw new IllegalStateException(e);
400                 }
401                 return written;
402             }
403         }
404 
405         @Override
406         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
407             assert outstanding != -1 : "multi-shot not implemented yet";
408 
409             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
410             final ChannelPipeline pipeline = pipeline();
411             ByteBuf byteBuf = this.readBuffer;
412             assert byteBuf != null;
413             try {
414                 recvmsgComplete(pipeline, allocHandle, byteBuf, res, flags, data, outstanding);
415             } catch (Throwable t) {
416                 Throwable e = (connected && t instanceof NativeIoException) ?
417                   translateForConnected((NativeIoException) t) : t;
418                 pipeline.fireExceptionCaught(e);
419             }
420         }
421 
422         private void recvmsgComplete(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
423                                       ByteBuf byteBuf, int res, int flags, int idx, int outstanding)
424                 throws IOException {
425             MsgHdrMemory hdr = recvmsgHdrs.hdr(idx);
426             if (res < 0) {
427                 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
428                     // If res is negative we should pass it to ioResult(...) which will either throw
429                     // or convert it to 0 if we could not read because the socket was not readable.
430                     allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
431                 }
432             } else {
433                 allocHandle.lastBytesRead(res);
434                 if (hdr.hasPort(IoUringDatagramChannel.this)) {
435                     allocHandle.incMessagesRead(1);
436                     DatagramPacket packet = hdr.get(
437                             IoUringDatagramChannel.this, registration().attachment(), byteBuf, res);
438                     pipeline.fireChannelRead(packet);
439                 }
440             }
441 
442             // Reset the id as this read was completed and so don't need to be cancelled later.
443             recvmsgHdrs.setId(idx, MsgHdrMemoryArray.NO_ID);
444             if (outstanding == 0) {
445                 // There are no outstanding completion events, release the readBuffer and see if we need to schedule
446                 // another one or if the user will do it.
447                 this.readBuffer.release();
448                 this.readBuffer = null;
449                 recvmsgHdrs.clear();
450 
451                 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
452                     if (allocHandle.lastBytesRead() > 0 &&
453                             allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER) &&
454                             // If IORING_CQE_F_SOCK_NONEMPTY is supported we should check for it first before
455                             // trying to schedule a read. If it's supported and not part of the flags we know for sure
456                             // that the next read (which would be using Native.MSG_DONTWAIT) will complete without
457                             // be able to read any data. This is useless work and we can skip it.
458                             (!IoUring.isCqeFSockNonEmptySupported() ||
459                                     (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0)) {
460                         // Let's schedule another read.
461                         scheduleRead(false);
462                     } else {
463                         // the read was completed with EAGAIN.
464                         allocHandle.readComplete();
465                         pipeline.fireChannelReadComplete();
466                     }
467                 }
468             }
469         }
470 
471         @Override
472         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
473             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
474             ByteBuf byteBuf = allocHandle.allocate(alloc());
475             assert readBuffer == null;
476             readBuffer = byteBuf;
477 
478             int writable = byteBuf.writableBytes();
479             allocHandle.attemptedBytesRead(writable);
480             int datagramSize = ((IoUringDatagramChannelConfig) config()).getMaxDatagramPayloadSize();
481 
482             int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
483 
484             int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
485             if (scheduled == 0) {
486                 // We could not schedule any recvmmsg so we need to release the buffer as there will be no
487                 // completion event.
488                 readBuffer = null;
489                 byteBuf.release();
490             }
491             return scheduled;
492         }
493 
494         private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
495             int writable = byteBuf.writableBytes();
496             long bufferAddress = IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex();
497             if (numDatagram <= 1) {
498                 return scheduleRecvmsg0(bufferAddress, writable, true) ? 1 : 0;
499             }
500             int i = 0;
501             // Add multiple IORING_OP_RECVMSG to the submission queue. This basically emulates recvmmsg(...)
502             for (; i < numDatagram && writable >= datagramSize; i++) {
503                 if (!scheduleRecvmsg0(bufferAddress, datagramSize, i == 0)) {
504                     break;
505                 }
506                 bufferAddress += datagramSize;
507                 writable -= datagramSize;
508             }
509             return i;
510         }
511 
512         private boolean scheduleRecvmsg0(long bufferAddress, int bufferLength, boolean first) {
513             MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
514             if (msgHdrMemory == null) {
515                 // We can not continue reading before we did not submit the recvmsg(s) and received the results.
516                 return false;
517             }
518             msgHdrMemory.set(socket, null, bufferAddress, bufferLength, (short) 0);
519 
520             int fd = fd().intValue();
521             int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
522             IoRegistration registration = registration();
523             // We always use idx here so we can detect if no idx was used by checking if data < 0 in
524             // readComplete0(...)
525             IoUringIoOps ops = IoUringIoOps.newRecvmsg(
526                     fd, (byte) 0, msgFlags, msgHdrMemory.address(), msgHdrMemory.idx());
527             long id = registration.submit(ops);
528             if (id == 0) {
529                 // Submission failed we don't used the MsgHdrMemory and so should give it back.
530                 recvmsgHdrs.restoreNextHdr(msgHdrMemory);
531                 return false;
532             }
533             recvmsgHdrs.setId(msgHdrMemory.idx(), id);
534             return true;
535         }
536 
537         @Override
538         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
539             ChannelOutboundBuffer outboundBuffer = outboundBuffer();
540 
541             // Reset the id as this write was completed and so don't need to be cancelled later.
542             sendmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
543             sendmsgResArray[data] = res;
544             // Store the result so we can handle it as soon as we have no outstanding writes anymore.
545             if (outstanding == 0) {
546                 // All writes are done as part of a batch. Let's remove these from the ChannelOutboundBuffer
547                 boolean writtenSomething = false;
548                 int numWritten = sendmsgHdrs.length();
549                 sendmsgHdrs.clear();
550                 for (int i = 0; i < numWritten; i++) {
551                     writtenSomething |= removeFromOutboundBuffer(
552                             outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
553                 }
554                 return writtenSomething;
555             }
556             return true;
557         }
558 
559         private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
560             if (res >= 0) {
561                 // When using Datagram we should consider the message written as long as res is not negative.
562                 return outboundBuffer.remove();
563             }
564             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
565                 return false;
566             }
567             try {
568                 return ioResult(errormsg, res) != 0;
569             } catch (Throwable cause) {
570                 Throwable e = (connected && cause instanceof NativeIoException) ?
571                         translateForConnected((NativeIoException) cause) : cause;
572                 return outboundBuffer.remove(e);
573             }
574         }
575 
576         @Override
577         void connectComplete(byte op, int res, int flags, short data) {
578             if (res >= 0) {
579                 connected = true;
580             }
581             super.connectComplete(op, res, flags, data);
582         }
583 
584         @Override
585         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
586             return writeProcessor.write(in);
587         }
588 
589         @Override
590         protected int scheduleWriteSingle(Object msg) {
591             return scheduleWrite(msg, true) ? 1 : 0;
592         }
593 
594         private boolean scheduleWrite(Object msg, boolean first) {
595             final ByteBuf data;
596             final InetSocketAddress remoteAddress;
597             final int segmentSize;
598             if (msg instanceof AddressedEnvelope) {
599                 @SuppressWarnings("unchecked")
600                 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
601                         (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
602                 data = envelope.content();
603                 remoteAddress = envelope.recipient();
604                 if (msg instanceof SegmentedDatagramPacket) {
605                     segmentSize = ((SegmentedDatagramPacket) msg).segmentSize();
606                 } else {
607                     segmentSize = 0;
608                 }
609             } else {
610                 data = (ByteBuf) msg;
611                 remoteAddress = (InetSocketAddress) remoteAddress();
612                 segmentSize = 0;
613             }
614 
615             long bufferAddress = IoUring.memoryAddress(data);
616             return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes(), segmentSize, first);
617         }
618 
619         private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress,
620                                         int bufferLength, int segmentSize, boolean first) {
621             MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
622             if (hdr == null) {
623                 // There is no MsgHdrMemory left to use. We need to submit and wait for the writes to complete
624                 // before we can write again.
625                 return false;
626             }
627             hdr.set(socket, remoteAddress, bufferAddress, bufferLength, (short) segmentSize);
628 
629             int fd = fd().intValue();
630             int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
631             IoRegistration registration = registration();
632             IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, msgFlags, hdr.address(), hdr.idx());
633             long id = registration.submit(ops);
634             if (id == 0) {
635                 // Submission failed we don't used the MsgHdrMemory and so should give it back.
636                 sendmsgHdrs.restoreNextHdr(hdr);
637                 return false;
638             }
639             sendmsgHdrs.setId(hdr.idx(), id);
640             return true;
641         }
642 
643         @Override
644         public void unregistered() {
645             super.unregistered();
646             sendmsgHdrs.release();
647             recvmsgHdrs.release();
648         }
649     }
650 
651     private static IOException translateForConnected(NativeIoException e) {
652         // We need to correctly translate connect errors to match NIO behaviour.
653         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
654             PortUnreachableException error = new PortUnreachableException(e.getMessage());
655             error.initCause(e);
656             return error;
657         }
658         return e;
659     }
660 
661     /**
662      * Returns {@code true} if the usage of {@link io.netty.channel.unix.SegmentedDatagramPacket} is supported.
663      *
664      * @return {@code true} if supported, {@code false} otherwise.
665      */
666     public static boolean isSegmentedDatagramPacketSupported() {
667         return IoUring.isAvailable();
668     }
669 
670     @Override
671     protected void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
672         if (numOutstandingReads > 0) {
673             int canceled = cancel(registration, Native.IORING_OP_RECVMSG, recvmsgHdrs);
674             assert canceled == numOutstandingReads;
675         }
676     }
677 
678     @Override
679     protected void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
680         if (numOutstandingWrites > 0) {
681             int canceled = cancel(registration, Native.IORING_OP_SENDMSG, sendmsgHdrs);
682             assert canceled == numOutstandingWrites;
683         }
684     }
685 
686     private int cancel(IoRegistration registration, byte op, MsgHdrMemoryArray array) {
687         int cancelled = 0;
688         for (int idx = 0; idx < array.length(); idx++) {
689             long id = array.id(idx);
690             if (id == MsgHdrMemoryArray.NO_ID) {
691                 continue;
692             }
693             // Let's try to cancel outstanding op as these might be submitted and waiting for data
694             // (via fastpoll).
695             IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, id, op);
696             registration.submit(ops);
697             cancelled++;
698         }
699         return cancelled;
700     }
701 
702     @Override
703     protected boolean socketIsEmpty(int flags) {
704         return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
705     }
706 
707     @Override
708     boolean isPollInFirst() {
709         return false;
710     }
711 }