View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelMetadata;
22  import io.netty.channel.ChannelOutboundBuffer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.DefaultAddressedEnvelope;
26  import io.netty.channel.IoRegistration;
27  import io.netty.channel.socket.DatagramChannel;
28  import io.netty.channel.socket.DatagramChannelConfig;
29  import io.netty.channel.socket.DatagramPacket;
30  import io.netty.channel.socket.SocketProtocolFamily;
31  import io.netty.channel.unix.Errors;
32  import io.netty.channel.unix.Errors.NativeIoException;
33  import io.netty.channel.unix.SegmentedDatagramPacket;
34  import io.netty.channel.unix.Socket;
35  import io.netty.util.UncheckedBooleanSupplier;
36  import io.netty.util.internal.ObjectUtil;
37  import io.netty.util.internal.StringUtil;
38  import io.netty.util.internal.SystemPropertyUtil;
39  import io.netty.util.internal.logging.InternalLogger;
40  import io.netty.util.internal.logging.InternalLoggerFactory;
41  
42  import java.io.IOException;
43  import java.net.Inet4Address;
44  import java.net.InetAddress;
45  import java.net.InetSocketAddress;
46  import java.net.NetworkInterface;
47  import java.net.PortUnreachableException;
48  import java.net.SocketAddress;
49  import java.nio.channels.UnresolvedAddressException;
50  
51  import static io.netty.channel.unix.Errors.ioResult;
52  
53  public final class IoUringDatagramChannel extends AbstractIoUringChannel implements DatagramChannel {
54      private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringDatagramChannel.class);
55      private static final boolean IP_MULTICAST_ALL =
56              SystemPropertyUtil.getBoolean("io.netty.channel.iouring.ipMulticastAll", false);
57      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
58      private static final String EXPECTED_TYPES =
59              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
60              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
61              StringUtil.simpleClassName(ByteBuf.class) + ", " +
62              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
63              StringUtil.simpleClassName(ByteBuf.class) + ')';
64  
65      private final IoUringDatagramChannelConfig config;
66      private volatile boolean connected;
67  
68      static {
69          if (logger.isDebugEnabled()) {
70              logger.debug("-Dio.netty.channel.iouring.ipMulticastAll: {}", IP_MULTICAST_ALL);
71          }
72      }
73  
74      // These buffers are used for msghdr, iov, sockaddr_in / sockaddr_in6 when doing recvmsg / sendmsg
75      //
76      // TODO: Alternative we could also allocate these everytime from the ByteBufAllocator or we could use
77      //       some sort of other pool. Let's keep it simple for now.
78      //
79      // Consider exposing some configuration for that.
80      private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray((short) 256);
81      private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray((short) 256);
82      private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
83  
84      /**
85       * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
86       * on the Operation Systems default which will be chosen.
87       */
88      public IoUringDatagramChannel() {
89          this(null);
90      }
91  
92      /**
93       * Create a new instance using the given {@link SocketProtocolFamily}. If {@code null} is used it will depend
94       * on the Operation Systems default which will be chosen.
95       */
96      public IoUringDatagramChannel(SocketProtocolFamily family) {
97          this(LinuxSocket.newSocketDgram(useIpv6(family)), false);
98      }
99  
100     private static boolean useIpv6(SocketProtocolFamily family) {
101         if (family == null) {
102             return Socket.isIPv6Preferred();
103         }
104         return family == SocketProtocolFamily.INET6;
105     }
106 
107     /**
108      * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
109      * on the Operation Systems default which will be chosen.
110      */
111     public IoUringDatagramChannel(int fd) {
112         this(new LinuxSocket(fd), true);
113     }
114 
115     private IoUringDatagramChannel(LinuxSocket fd, boolean active) {
116         // Always use a blocking fd and so make use of fast-poll.
117         super(null, fd, active);
118 
119         // Configure IP_MULTICAST_ALL - disable by default to match the behaviour of NIO.
120         try {
121             fd.setIpMulticastAll(IP_MULTICAST_ALL);
122         } catch (IOException e) {
123             logger.debug("Failed to set IP_MULTICAST_ALL to {}", IP_MULTICAST_ALL, e);
124         }
125 
126         config = new IoUringDatagramChannelConfig(this);
127     }
128 
129     @Override
130     public InetSocketAddress remoteAddress() {
131         return (InetSocketAddress) super.remoteAddress();
132     }
133 
134     @Override
135     public InetSocketAddress localAddress() {
136         return (InetSocketAddress) super.localAddress();
137     }
138 
139     @Override
140     public ChannelMetadata metadata() {
141         return METADATA;
142     }
143 
144     @Override
145     public boolean isActive() {
146         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || super.isActive());
147     }
148 
149     @Override
150     public boolean isConnected() {
151         return connected;
152     }
153 
154     @Override
155     public ChannelFuture joinGroup(InetAddress multicastAddress) {
156         return joinGroup(multicastAddress, newPromise());
157     }
158 
159     @Override
160     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
161         try {
162             return joinGroup(
163                     multicastAddress,
164                     NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
165         } catch (IOException e) {
166             promise.setFailure(e);
167         }
168         return promise;
169     }
170 
171     @Override
172     public ChannelFuture joinGroup(
173             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
174         return joinGroup(multicastAddress, networkInterface, newPromise());
175     }
176 
177     @Override
178     public ChannelFuture joinGroup(
179             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
180             ChannelPromise promise) {
181         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
182     }
183 
184     @Override
185     public ChannelFuture joinGroup(
186             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
187         return joinGroup(multicastAddress, networkInterface, source, newPromise());
188     }
189 
190     @Override
191     public ChannelFuture joinGroup(
192             final InetAddress multicastAddress, final NetworkInterface networkInterface,
193             final InetAddress source, final ChannelPromise promise) {
194 
195         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
196         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
197 
198         try {
199             socket.joinGroup(multicastAddress, networkInterface, source);
200             promise.setSuccess();
201         } catch (IOException e) {
202             promise.setFailure(e);
203         }
204         return promise;
205     }
206 
207     @Override
208     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
209         return leaveGroup(multicastAddress, newPromise());
210     }
211 
212     @Override
213     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
214         try {
215             return leaveGroup(
216                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
217         } catch (IOException e) {
218             promise.setFailure(e);
219         }
220         return promise;
221     }
222 
223     @Override
224     public ChannelFuture leaveGroup(
225             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
226         return leaveGroup(multicastAddress, networkInterface, newPromise());
227     }
228 
229     @Override
230     public ChannelFuture leaveGroup(
231             InetSocketAddress multicastAddress,
232             NetworkInterface networkInterface, ChannelPromise promise) {
233         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
234     }
235 
236     @Override
237     public ChannelFuture leaveGroup(
238             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
239         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
240     }
241 
242     @Override
243     public ChannelFuture leaveGroup(
244             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
245             final ChannelPromise promise) {
246         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
247         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
248 
249         try {
250             socket.leaveGroup(multicastAddress, networkInterface, source);
251             promise.setSuccess();
252         } catch (IOException e) {
253             promise.setFailure(e);
254         }
255         return promise;
256     }
257 
258     @Override
259     public ChannelFuture block(
260             InetAddress multicastAddress, NetworkInterface networkInterface,
261             InetAddress sourceToBlock) {
262         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
263     }
264 
265     @Override
266     public ChannelFuture block(
267             final InetAddress multicastAddress, final NetworkInterface networkInterface,
268             final InetAddress sourceToBlock, final ChannelPromise promise) {
269         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
270         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
271         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
272 
273         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
274         return promise;
275     }
276 
277     @Override
278     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
279         return block(multicastAddress, sourceToBlock, newPromise());
280     }
281 
282     @Override
283     public ChannelFuture block(
284             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
285         try {
286             return block(
287                     multicastAddress,
288                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
289                     sourceToBlock, promise);
290         } catch (Throwable e) {
291             promise.setFailure(e);
292         }
293         return promise;
294     }
295 
296     @Override
297     protected AbstractUnsafe newUnsafe() {
298         return new IoUringDatagramChannelUnsafe();
299     }
300 
301     @Override
302     protected void doBind(SocketAddress localAddress) throws Exception {
303         if (localAddress instanceof InetSocketAddress) {
304             InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
305             if (socketAddress.getAddress().isAnyLocalAddress() &&
306                     socketAddress.getAddress() instanceof Inet4Address) {
307                 if (socket.family() == SocketProtocolFamily.INET6) {
308                     localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
309                 }
310             }
311         }
312         super.doBind(localAddress);
313         active = true;
314     }
315 
316     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
317         if (envelope.recipient() instanceof InetSocketAddress
318                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
319             throw new UnresolvedAddressException();
320         }
321     }
322 
323     @Override
324     protected Object filterOutboundMessage(Object msg) {
325         if (msg instanceof DatagramPacket) {
326             DatagramPacket packet = (DatagramPacket) msg;
327             checkUnresolved(packet);
328             ByteBuf content = packet.content();
329             return !content.hasMemoryAddress() ?
330                     packet.replace(newDirectBuffer(packet, content)) : msg;
331         }
332 
333         if (msg instanceof ByteBuf) {
334             ByteBuf buf = (ByteBuf) msg;
335             return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
336         }
337 
338         if (msg instanceof AddressedEnvelope) {
339             @SuppressWarnings("unchecked")
340             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
341             checkUnresolved(e);
342             if (e.content() instanceof ByteBuf &&
343                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
344 
345                 ByteBuf content = (ByteBuf) e.content();
346                 return !content.hasMemoryAddress()?
347                         new DefaultAddressedEnvelope<>(
348                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
349             }
350         }
351 
352         throw new UnsupportedOperationException(
353                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
354     }
355 
356     @Override
357     public DatagramChannelConfig config() {
358         return config;
359     }
360 
361     @Override
362     protected void doDisconnect() throws Exception {
363         // TODO: use io_uring for this too...
364         socket.disconnect();
365         connected = active = false;
366 
367         resetCachedAddresses();
368     }
369 
370     @Override
371     protected void doClose() throws Exception {
372         super.doClose();
373         connected = false;
374     }
375 
376     private final class IoUringDatagramChannelUnsafe extends AbstractUringUnsafe {
377         private final WriteProcessor writeProcessor = new WriteProcessor();
378 
379         private ByteBuf readBuffer;
380 
381         private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
382             private int written;
383             @Override
384             public boolean processMessage(Object msg) {
385                 if (scheduleWrite(msg, written == 0)) {
386                     written++;
387                     return true;
388                 }
389                 return false;
390             }
391 
392             int write(ChannelOutboundBuffer in) {
393                 written = 0;
394                 try {
395                     in.forEachFlushedMessage(this);
396                 } catch (Exception e) {
397                     // This should never happen as our processMessage(...) never throws.
398                     throw new IllegalStateException(e);
399                 }
400                 return written;
401             }
402         }
403 
404         @Override
405         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
406             assert outstanding != -1 : "multi-shot not implemented yet";
407 
408             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
409             final ChannelPipeline pipeline = pipeline();
410             ByteBuf byteBuf = this.readBuffer;
411             assert byteBuf != null;
412             try {
413                 recvmsgComplete(pipeline, allocHandle, byteBuf, res, flags, data, outstanding);
414             } catch (Throwable t) {
415                 Throwable e = (connected && t instanceof NativeIoException) ?
416                   translateForConnected((NativeIoException) t) : t;
417                 pipeline.fireExceptionCaught(e);
418             }
419         }
420 
421         private void recvmsgComplete(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
422                                       ByteBuf byteBuf, int res, int flags, int idx, int outstanding)
423                 throws IOException {
424             MsgHdrMemory hdr = recvmsgHdrs.hdr(idx);
425             if (res < 0) {
426                 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
427                     // If res is negative we should pass it to ioResult(...) which will either throw
428                     // or convert it to 0 if we could not read because the socket was not readable.
429                     allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
430                 }
431             } else {
432                 allocHandle.lastBytesRead(res);
433                 if (hdr.hasPort(IoUringDatagramChannel.this)) {
434                     allocHandle.incMessagesRead(1);
435                     DatagramPacket packet = hdr.get(
436                             IoUringDatagramChannel.this, registration().attachment(), byteBuf, res);
437                     pipeline.fireChannelRead(packet);
438                 }
439             }
440 
441             // Reset the id as this read was completed and so don't need to be cancelled later.
442             recvmsgHdrs.setId(idx, MsgHdrMemoryArray.NO_ID);
443             if (outstanding == 0) {
444                 // There are no outstanding completion events, release the readBuffer and see if we need to schedule
445                 // another one or if the user will do it.
446                 this.readBuffer.release();
447                 this.readBuffer = null;
448                 recvmsgHdrs.clear();
449 
450                 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
451                     if (allocHandle.lastBytesRead() > 0 &&
452                             allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER) &&
453                             // If IORING_CQE_F_SOCK_NONEMPTY is supported we should check for it first before
454                             // trying to schedule a read. If it's supported and not part of the flags we know for sure
455                             // that the next read (which would be using Native.MSG_DONTWAIT) will complete without
456                             // be able to read any data. This is useless work and we can skip it.
457                             (!IoUring.isCqeFSockNonEmptySupported() ||
458                                     (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0)) {
459                         // Let's schedule another read.
460                         scheduleRead(false);
461                     } else {
462                         // the read was completed with EAGAIN.
463                         allocHandle.readComplete();
464                         pipeline.fireChannelReadComplete();
465                     }
466                 }
467             }
468         }
469 
470         @Override
471         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
472             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
473             ByteBuf byteBuf = allocHandle.allocate(alloc());
474             assert readBuffer == null;
475             readBuffer = byteBuf;
476 
477             int writable = byteBuf.writableBytes();
478             allocHandle.attemptedBytesRead(writable);
479             int datagramSize = ((IoUringDatagramChannelConfig) config()).getMaxDatagramPayloadSize();
480 
481             int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
482 
483             int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
484             if (scheduled == 0) {
485                 // We could not schedule any recvmmsg so we need to release the buffer as there will be no
486                 // completion event.
487                 readBuffer = null;
488                 byteBuf.release();
489             }
490             return scheduled;
491         }
492 
493         private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
494             int writable = byteBuf.writableBytes();
495             long bufferAddress = IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex();
496             if (numDatagram <= 1) {
497                 return scheduleRecvmsg0(bufferAddress, writable, true) ? 1 : 0;
498             }
499             int i = 0;
500             // Add multiple IORING_OP_RECVMSG to the submission queue. This basically emulates recvmmsg(...)
501             for (; i < numDatagram && writable >= datagramSize; i++) {
502                 if (!scheduleRecvmsg0(bufferAddress, datagramSize, i == 0)) {
503                     break;
504                 }
505                 bufferAddress += datagramSize;
506                 writable -= datagramSize;
507             }
508             return i;
509         }
510 
511         private boolean scheduleRecvmsg0(long bufferAddress, int bufferLength, boolean first) {
512             MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
513             if (msgHdrMemory == null) {
514                 // We can not continue reading before we did not submit the recvmsg(s) and received the results.
515                 return false;
516             }
517             msgHdrMemory.set(socket, null, bufferAddress, bufferLength, (short) 0);
518 
519             int fd = fd().intValue();
520             int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
521             IoRegistration registration = registration();
522             // We always use idx here so we can detect if no idx was used by checking if data < 0 in
523             // readComplete0(...)
524             IoUringIoOps ops = IoUringIoOps.newRecvmsg(
525                     fd, (byte) 0, msgFlags, msgHdrMemory.address(), msgHdrMemory.idx());
526             long id = registration.submit(ops);
527             if (id == 0) {
528                 // Submission failed we don't used the MsgHdrMemory and so should give it back.
529                 recvmsgHdrs.restoreNextHdr(msgHdrMemory);
530                 return false;
531             }
532             recvmsgHdrs.setId(msgHdrMemory.idx(), id);
533             return true;
534         }
535 
536         @Override
537         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
538             ChannelOutboundBuffer outboundBuffer = outboundBuffer();
539 
540             // Reset the id as this write was completed and so don't need to be cancelled later.
541             sendmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
542             sendmsgResArray[data] = res;
543             // Store the result so we can handle it as soon as we have no outstanding writes anymore.
544             if (outstanding == 0) {
545                 // All writes are done as part of a batch. Let's remove these from the ChannelOutboundBuffer
546                 boolean writtenSomething = false;
547                 int numWritten = sendmsgHdrs.length();
548                 sendmsgHdrs.clear();
549                 for (int i = 0; i < numWritten; i++) {
550                     writtenSomething |= removeFromOutboundBuffer(
551                             outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
552                 }
553                 return writtenSomething;
554             }
555             return true;
556         }
557 
558         private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
559             if (res >= 0) {
560                 // When using Datagram we should consider the message written as long as res is not negative.
561                 return outboundBuffer.remove();
562             }
563             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
564                 return false;
565             }
566             try {
567                 return ioResult(errormsg, res) != 0;
568             } catch (Throwable cause) {
569                 return outboundBuffer.remove(cause);
570             }
571         }
572 
573         @Override
574         void connectComplete(byte op, int res, int flags, short data) {
575             if (res >= 0) {
576                 connected = true;
577             }
578             super.connectComplete(op, res, flags, data);
579         }
580 
581         @Override
582         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
583             return writeProcessor.write(in);
584         }
585 
586         @Override
587         protected int scheduleWriteSingle(Object msg) {
588             return scheduleWrite(msg, true) ? 1 : 0;
589         }
590 
591         private boolean scheduleWrite(Object msg, boolean first) {
592             final ByteBuf data;
593             final InetSocketAddress remoteAddress;
594             final int segmentSize;
595             if (msg instanceof AddressedEnvelope) {
596                 @SuppressWarnings("unchecked")
597                 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
598                         (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
599                 data = envelope.content();
600                 remoteAddress = envelope.recipient();
601                 if (msg instanceof SegmentedDatagramPacket) {
602                     segmentSize = ((SegmentedDatagramPacket) msg).segmentSize();
603                 } else {
604                     segmentSize = 0;
605                 }
606             } else {
607                 data = (ByteBuf) msg;
608                 remoteAddress = (InetSocketAddress) remoteAddress();
609                 segmentSize = 0;
610             }
611 
612             long bufferAddress = IoUring.memoryAddress(data);
613             return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes(), segmentSize, first);
614         }
615 
616         private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress,
617                                         int bufferLength, int segmentSize, boolean first) {
618             MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
619             if (hdr == null) {
620                 // There is no MsgHdrMemory left to use. We need to submit and wait for the writes to complete
621                 // before we can write again.
622                 return false;
623             }
624             hdr.set(socket, remoteAddress, bufferAddress, bufferLength, (short) segmentSize);
625 
626             int fd = fd().intValue();
627             int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
628             IoRegistration registration = registration();
629             IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, msgFlags, hdr.address(), hdr.idx());
630             long id = registration.submit(ops);
631             if (id == 0) {
632                 // Submission failed we don't used the MsgHdrMemory and so should give it back.
633                 sendmsgHdrs.restoreNextHdr(hdr);
634                 return false;
635             }
636             sendmsgHdrs.setId(hdr.idx(), id);
637             return true;
638         }
639 
640         @Override
641         public void unregistered() {
642             super.unregistered();
643             sendmsgHdrs.release();
644             recvmsgHdrs.release();
645         }
646     }
647 
648     private static IOException translateForConnected(NativeIoException e) {
649         // We need to correctly translate connect errors to match NIO behaviour.
650         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
651             PortUnreachableException error = new PortUnreachableException(e.getMessage());
652             error.initCause(e);
653             return error;
654         }
655         return e;
656     }
657 
658     /**
659      * Returns {@code true} if the usage of {@link io.netty.channel.unix.SegmentedDatagramPacket} is supported.
660      *
661      * @return {@code true} if supported, {@code false} otherwise.
662      */
663     public static boolean isSegmentedDatagramPacketSupported() {
664         return IoUring.isAvailable();
665     }
666 
667     @Override
668     protected void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
669         if (numOutstandingReads > 0) {
670             int canceled = cancel(registration, Native.IORING_OP_RECVMSG, recvmsgHdrs);
671             assert canceled == numOutstandingReads;
672         }
673     }
674 
675     @Override
676     protected void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
677         if (numOutstandingWrites > 0) {
678             int canceled = cancel(registration, Native.IORING_OP_SENDMSG, sendmsgHdrs);
679             assert canceled == numOutstandingWrites;
680         }
681     }
682 
683     private int cancel(IoRegistration registration, byte op, MsgHdrMemoryArray array) {
684         int cancelled = 0;
685         for (int idx = 0; idx < array.length(); idx++) {
686             long id = array.id(idx);
687             if (id == MsgHdrMemoryArray.NO_ID) {
688                 continue;
689             }
690             // Let's try to cancel outstanding op as these might be submitted and waiting for data
691             // (via fastpoll).
692             IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, id, op);
693             registration.submit(ops);
694             cancelled++;
695         }
696         return cancelled;
697     }
698 
699     @Override
700     protected boolean socketIsEmpty(int flags) {
701         return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
702     }
703 
704     @Override
705     boolean isPollInFirst() {
706         return false;
707     }
708 }