View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.ChannelException;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultAddressedEnvelope;
27  import io.netty.channel.IoRegistration;
28  import io.netty.channel.socket.DatagramChannel;
29  import io.netty.channel.socket.DatagramChannelConfig;
30  import io.netty.channel.socket.DatagramPacket;
31  import io.netty.channel.socket.SocketProtocolFamily;
32  import io.netty.channel.unix.Errors;
33  import io.netty.channel.unix.Errors.NativeIoException;
34  import io.netty.channel.unix.SegmentedDatagramPacket;
35  import io.netty.channel.unix.Socket;
36  import io.netty.util.UncheckedBooleanSupplier;
37  import io.netty.util.internal.ObjectUtil;
38  import io.netty.util.internal.StringUtil;
39  import io.netty.util.internal.SystemPropertyUtil;
40  import io.netty.util.internal.logging.InternalLogger;
41  import io.netty.util.internal.logging.InternalLoggerFactory;
42  
43  import java.io.IOException;
44  import java.net.Inet4Address;
45  import java.net.InetAddress;
46  import java.net.InetSocketAddress;
47  import java.net.NetworkInterface;
48  import java.net.PortUnreachableException;
49  import java.net.SocketAddress;
50  import java.nio.channels.UnresolvedAddressException;
51  
52  import static io.netty.channel.unix.Errors.ioResult;
53  
54  public final class IoUringDatagramChannel extends AbstractIoUringChannel implements DatagramChannel {
55      private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringDatagramChannel.class);
56      private static final boolean IP_MULTICAST_ALL =
57              SystemPropertyUtil.getBoolean("io.netty.channel.iouring.ipMulticastAll", false);
58      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
59      private static final String EXPECTED_TYPES =
60              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
61              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
62              StringUtil.simpleClassName(ByteBuf.class) + ", " +
63              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
64              StringUtil.simpleClassName(ByteBuf.class) + ')';
65  
66      private final IoUringDatagramChannelConfig config;
67      private volatile boolean connected;
68  
69      static {
70          if (logger.isDebugEnabled()) {
71              logger.debug("-Dio.netty.channel.iouring.ipMulticastAll: {}", IP_MULTICAST_ALL);
72          }
73      }
74  
75      // These buffers are used for msghdr, iov, sockaddr_in / sockaddr_in6 when doing recvmsg / sendmsg
76      //
77      // TODO: Alternative we could also allocate these everytime from the ByteBufAllocator or we could use
78      //       some sort of other pool. Let's keep it simple for now.
79      //
80      // Consider exposing some configuration for that.
81      private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray((short) 256);
82      private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray((short) 256);
83      private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
84  
85      /**
86       * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
87       * on the Operation Systems default which will be chosen.
88       */
89      public IoUringDatagramChannel() {
90          this(null);
91      }
92  
93      /**
94       * Create a new instance using the given {@link SocketProtocolFamily}. If {@code null} is used it will depend
95       * on the Operation Systems default which will be chosen.
96       */
97      public IoUringDatagramChannel(SocketProtocolFamily family) {
98          this(LinuxSocket.newSocketDgram(useIpv6(family)), false);
99      }
100 
101     private static boolean useIpv6(SocketProtocolFamily family) {
102         if (family == null) {
103             return Socket.isIPv6Preferred();
104         }
105         return family == SocketProtocolFamily.INET6;
106     }
107 
108     /**
109      * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
110      * on the Operation Systems default which will be chosen.
111      */
112     public IoUringDatagramChannel(int fd) {
113         this(new LinuxSocket(fd), true);
114     }
115 
116     private IoUringDatagramChannel(LinuxSocket fd, boolean active) {
117         // Always use a blocking fd and so make use of fast-poll.
118         super(null, fd, active);
119 
120         // Configure IP_MULTICAST_ALL - disable by default to match the behaviour of NIO.
121         try {
122             fd.setIpMulticastAll(IP_MULTICAST_ALL);
123         } catch (IOException | ChannelException e) {
124             logger.debug("Failed to set IP_MULTICAST_ALL to {}", IP_MULTICAST_ALL, e);
125         }
126 
127         config = new IoUringDatagramChannelConfig(this);
128     }
129 
130     @Override
131     protected boolean isStreamSocket() {
132         return false;
133     }
134 
135     @Override
136     public InetSocketAddress remoteAddress() {
137         return (InetSocketAddress) super.remoteAddress();
138     }
139 
140     @Override
141     public InetSocketAddress localAddress() {
142         return (InetSocketAddress) super.localAddress();
143     }
144 
145     @Override
146     public ChannelMetadata metadata() {
147         return METADATA;
148     }
149 
150     @Override
151     public boolean isActive() {
152         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || super.isActive());
153     }
154 
155     @Override
156     public boolean isConnected() {
157         return connected;
158     }
159 
160     @Override
161     public ChannelFuture joinGroup(InetAddress multicastAddress) {
162         return joinGroup(multicastAddress, newPromise());
163     }
164 
165     @Override
166     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
167         try {
168             return joinGroup(
169                     multicastAddress,
170                     NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
171         } catch (IOException e) {
172             promise.setFailure(e);
173         }
174         return promise;
175     }
176 
177     @Override
178     public ChannelFuture joinGroup(
179             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
180         return joinGroup(multicastAddress, networkInterface, newPromise());
181     }
182 
183     @Override
184     public ChannelFuture joinGroup(
185             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
186             ChannelPromise promise) {
187         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
188     }
189 
190     @Override
191     public ChannelFuture joinGroup(
192             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
193         return joinGroup(multicastAddress, networkInterface, source, newPromise());
194     }
195 
196     @Override
197     public ChannelFuture joinGroup(
198             final InetAddress multicastAddress, final NetworkInterface networkInterface,
199             final InetAddress source, final ChannelPromise promise) {
200 
201         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
202         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
203 
204         try {
205             socket.joinGroup(multicastAddress, networkInterface, source);
206             promise.setSuccess();
207         } catch (IOException e) {
208             promise.setFailure(e);
209         }
210         return promise;
211     }
212 
213     @Override
214     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
215         return leaveGroup(multicastAddress, newPromise());
216     }
217 
218     @Override
219     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
220         try {
221             return leaveGroup(
222                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
223         } catch (IOException e) {
224             promise.setFailure(e);
225         }
226         return promise;
227     }
228 
229     @Override
230     public ChannelFuture leaveGroup(
231             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
232         return leaveGroup(multicastAddress, networkInterface, newPromise());
233     }
234 
235     @Override
236     public ChannelFuture leaveGroup(
237             InetSocketAddress multicastAddress,
238             NetworkInterface networkInterface, ChannelPromise promise) {
239         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
240     }
241 
242     @Override
243     public ChannelFuture leaveGroup(
244             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
245         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
246     }
247 
248     @Override
249     public ChannelFuture leaveGroup(
250             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
251             final ChannelPromise promise) {
252         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
253         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
254 
255         try {
256             socket.leaveGroup(multicastAddress, networkInterface, source);
257             promise.setSuccess();
258         } catch (IOException e) {
259             promise.setFailure(e);
260         }
261         return promise;
262     }
263 
264     @Override
265     public ChannelFuture block(
266             InetAddress multicastAddress, NetworkInterface networkInterface,
267             InetAddress sourceToBlock) {
268         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
269     }
270 
271     @Override
272     public ChannelFuture block(
273             final InetAddress multicastAddress, final NetworkInterface networkInterface,
274             final InetAddress sourceToBlock, final ChannelPromise promise) {
275         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
276         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
277         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
278 
279         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
280         return promise;
281     }
282 
283     @Override
284     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
285         return block(multicastAddress, sourceToBlock, newPromise());
286     }
287 
288     @Override
289     public ChannelFuture block(
290             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
291         try {
292             return block(
293                     multicastAddress,
294                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
295                     sourceToBlock, promise);
296         } catch (Throwable e) {
297             promise.setFailure(e);
298         }
299         return promise;
300     }
301 
302     @Override
303     protected AbstractUnsafe newUnsafe() {
304         return new IoUringDatagramChannelUnsafe();
305     }
306 
307     @Override
308     protected void doBind(SocketAddress localAddress) throws Exception {
309         if (localAddress instanceof InetSocketAddress) {
310             InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
311             if (socketAddress.getAddress().isAnyLocalAddress() &&
312                     socketAddress.getAddress() instanceof Inet4Address) {
313                 if (socket.family() == SocketProtocolFamily.INET6) {
314                     localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
315                 }
316             }
317         }
318         super.doBind(localAddress);
319         active = true;
320     }
321 
322     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
323         if (envelope.recipient() instanceof InetSocketAddress
324                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
325             throw new UnresolvedAddressException();
326         }
327     }
328 
329     @Override
330     protected Object filterOutboundMessage(Object msg) {
331         if (msg instanceof DatagramPacket) {
332             DatagramPacket packet = (DatagramPacket) msg;
333             checkUnresolved(packet);
334             ByteBuf content = packet.content();
335             return !content.hasMemoryAddress() ?
336                     packet.replace(newDirectBuffer(packet, content)) : msg;
337         }
338 
339         if (msg instanceof ByteBuf) {
340             ByteBuf buf = (ByteBuf) msg;
341             return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
342         }
343 
344         if (msg instanceof AddressedEnvelope) {
345             @SuppressWarnings("unchecked")
346             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
347             checkUnresolved(e);
348             if (e.content() instanceof ByteBuf &&
349                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
350 
351                 ByteBuf content = (ByteBuf) e.content();
352                 return !content.hasMemoryAddress()?
353                         new DefaultAddressedEnvelope<>(
354                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
355             }
356         }
357 
358         throw new UnsupportedOperationException(
359                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
360     }
361 
362     @Override
363     public DatagramChannelConfig config() {
364         return config;
365     }
366 
367     @Override
368     protected void doDisconnect() throws Exception {
369         // TODO: use io_uring for this too...
370         socket.disconnect();
371         connected = active = false;
372 
373         resetCachedAddresses();
374     }
375 
376     @Override
377     protected void doClose() throws Exception {
378         super.doClose();
379         connected = false;
380     }
381 
382     private final class IoUringDatagramChannelUnsafe extends AbstractUringUnsafe {
383         private final WriteProcessor writeProcessor = new WriteProcessor();
384 
385         private ByteBuf readBuffer;
386 
387         private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
388             private int written;
389             @Override
390             public boolean processMessage(Object msg) {
391                 if (scheduleWrite(msg, written == 0)) {
392                     written++;
393                     return true;
394                 }
395                 return false;
396             }
397 
398             int write(ChannelOutboundBuffer in) {
399                 written = 0;
400                 try {
401                     in.forEachFlushedMessage(this);
402                 } catch (Exception e) {
403                     // This should never happen as our processMessage(...) never throws.
404                     throw new IllegalStateException(e);
405                 }
406                 return written;
407             }
408         }
409 
410         @Override
411         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
412             assert outstanding != -1 : "multi-shot not implemented yet";
413 
414             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
415             final ChannelPipeline pipeline = pipeline();
416             ByteBuf byteBuf = this.readBuffer;
417             assert byteBuf != null;
418             MsgHdrMemory hdr = recvmsgHdrs.hdr(data);
419             // Reset the id as this read was completed and so don't need to be cancelled later.
420             recvmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
421 
422             try {
423                 if (res < 0) {
424                     if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
425                         // If res is negative we should pass it to ioResult(...) which will either throw
426                         // or convert it to 0 if we could not read because the socket was not readable.
427                         allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
428                     }
429                 } else {
430                     allocHandle.lastBytesRead(res);
431                     if (hdr.hasPort(IoUringDatagramChannel.this)) {
432                         allocHandle.incMessagesRead(1);
433                         DatagramPacket packet = hdr.get(
434                                 IoUringDatagramChannel.this, registration().attachment(), byteBuf, res);
435                         pipeline.fireChannelRead(packet);
436                     }
437                 }
438             } catch (Throwable t) {
439                 Throwable e = (connected && t instanceof NativeIoException) ?
440                         translateForConnected((NativeIoException) t) : t;
441                 pipeline.fireExceptionCaught(e);
442             }
443 
444             if (outstanding == 0) {
445                 // There are no outstanding completion events, release the readBuffer and see if we need to schedule
446                 // another one or if the user will do it.
447                 this.readBuffer.release();
448                 this.readBuffer = null;
449                 recvmsgHdrs.clear();
450 
451                 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
452                     if (allocHandle.lastBytesRead() > 0 &&
453                             allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER) &&
454                             // If IORING_CQE_F_SOCK_NONEMPTY is supported we should check for it first before
455                             // trying to schedule a read. If it's supported and not part of the flags we know for sure
456                             // that the next read (which would be using Native.MSG_DONTWAIT) will complete without
457                             // be able to read any data. This is useless work and we can skip it.
458                             (!IoUring.isCqeFSockNonEmptySupported() ||
459                                     (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0)) {
460                         // Let's schedule another read.
461                         scheduleRead(false);
462                     } else {
463                         // the read was completed with EAGAIN.
464                         allocHandle.readComplete();
465                         pipeline.fireChannelReadComplete();
466                     }
467                 }
468             }
469         }
470 
471         @Override
472         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
473             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
474             ByteBuf byteBuf = allocHandle.allocate(alloc());
475             assert readBuffer == null;
476             readBuffer = byteBuf;
477 
478             int writable = byteBuf.writableBytes();
479             allocHandle.attemptedBytesRead(writable);
480             int datagramSize = ((IoUringDatagramChannelConfig) config()).getMaxDatagramPayloadSize();
481 
482             int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
483 
484             int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
485             if (scheduled == 0) {
486                 // We could not schedule any recvmmsg so we need to release the buffer as there will be no
487                 // completion event.
488                 readBuffer = null;
489                 byteBuf.release();
490             }
491             return scheduled;
492         }
493 
494         private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
495             int writable = byteBuf.writableBytes();
496             long bufferAddress = IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex();
497             if (numDatagram <= 1) {
498                 return scheduleRecvmsg0(bufferAddress, writable, true) ? 1 : 0;
499             }
500             int i = 0;
501             // Add multiple IORING_OP_RECVMSG to the submission queue. This basically emulates recvmmsg(...)
502             for (; i < numDatagram && writable >= datagramSize; i++) {
503                 if (!scheduleRecvmsg0(bufferAddress, datagramSize, i == 0)) {
504                     break;
505                 }
506                 bufferAddress += datagramSize;
507                 writable -= datagramSize;
508             }
509             return i;
510         }
511 
512         private boolean scheduleRecvmsg0(long bufferAddress, int bufferLength, boolean first) {
513             MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
514             if (msgHdrMemory == null) {
515                 // We can not continue reading before we did not submit the recvmsg(s) and received the results.
516                 return false;
517             }
518             msgHdrMemory.set(socket, null, bufferAddress, bufferLength, (short) 0);
519 
520             int fd = fd().intValue();
521             int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
522             IoRegistration registration = registration();
523             // We always use idx here so we can detect if no idx was used by checking if data < 0 in
524             // readComplete0(...)
525             IoUringIoOps ops = IoUringIoOps.newRecvmsg(
526                     fd, (byte) 0, msgFlags, msgHdrMemory.address(), msgHdrMemory.idx());
527             long id = registration.submit(ops);
528             if (id == 0) {
529                 // Submission failed we don't used the MsgHdrMemory and so should give it back.
530                 recvmsgHdrs.restoreNextHdr(msgHdrMemory);
531                 return false;
532             }
533             recvmsgHdrs.setId(msgHdrMemory.idx(), id);
534             return true;
535         }
536 
537         @Override
538         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
539             ChannelOutboundBuffer outboundBuffer = outboundBuffer();
540 
541             // Reset the id as this write was completed and so don't need to be cancelled later.
542             sendmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
543             sendmsgResArray[data] = res;
544             // Store the result so we can handle it as soon as we have no outstanding writes anymore.
545             if (outstanding == 0) {
546                 // All writes are done as part of a batch. Let's remove these from the ChannelOutboundBuffer
547                 boolean writtenSomething = false;
548                 int numWritten = sendmsgHdrs.length();
549                 sendmsgHdrs.clear();
550                 for (int i = 0; i < numWritten; i++) {
551                     writtenSomething |= removeFromOutboundBuffer(
552                             outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
553                 }
554                 return writtenSomething;
555             }
556             return true;
557         }
558 
559         private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
560             if (res >= 0) {
561                 // When using Datagram we should consider the message written as long as res is not negative.
562                 return outboundBuffer.remove();
563             }
564             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
565                 return false;
566             }
567             try {
568                 return ioResult(errormsg, res) != 0;
569             } catch (Throwable cause) {
570                 Throwable e = (connected && cause instanceof NativeIoException) ?
571                         translateForConnected((NativeIoException) cause) : cause;
572                 return outboundBuffer.remove(e);
573             }
574         }
575 
576         @Override
577         void connectComplete(byte op, int res, int flags, short data) {
578             if (res >= 0) {
579                 connected = true;
580             }
581             super.connectComplete(op, res, flags, data);
582         }
583 
584         @Override
585         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
586             return writeProcessor.write(in);
587         }
588 
589         @Override
590         protected int scheduleWriteSingle(Object msg) {
591             return scheduleWrite(msg, true) ? 1 : 0;
592         }
593 
594         private boolean scheduleWrite(Object msg, boolean first) {
595             final ByteBuf data;
596             final InetSocketAddress remoteAddress;
597             final int segmentSize;
598             if (msg instanceof AddressedEnvelope) {
599                 @SuppressWarnings("unchecked")
600                 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
601                         (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
602                 data = envelope.content();
603                 remoteAddress = envelope.recipient();
604                 if (msg instanceof SegmentedDatagramPacket) {
605                     segmentSize = ((SegmentedDatagramPacket) msg).segmentSize();
606                 } else {
607                     segmentSize = 0;
608                 }
609             } else {
610                 data = (ByteBuf) msg;
611                 remoteAddress = (InetSocketAddress) remoteAddress();
612                 segmentSize = 0;
613             }
614 
615             long bufferAddress = IoUring.memoryAddress(data);
616             return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes(), segmentSize, first);
617         }
618 
619         private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress,
620                                         int bufferLength, int segmentSize, boolean first) {
621             MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
622             if (hdr == null) {
623                 // There is no MsgHdrMemory left to use. We need to submit and wait for the writes to complete
624                 // before we can write again.
625                 return false;
626             }
627             hdr.set(socket, remoteAddress, bufferAddress, bufferLength, (short) segmentSize);
628 
629             int fd = fd().intValue();
630             int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
631             IoRegistration registration = registration();
632             IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, msgFlags, hdr.address(), hdr.idx());
633             long id = registration.submit(ops);
634             if (id == 0) {
635                 // Submission failed we don't used the MsgHdrMemory and so should give it back.
636                 sendmsgHdrs.restoreNextHdr(hdr);
637                 return false;
638             }
639             sendmsgHdrs.setId(hdr.idx(), id);
640             return true;
641         }
642 
643         @Override
644         public void unregistered() {
645             super.unregistered();
646             sendmsgHdrs.release();
647             recvmsgHdrs.release();
648             assert readBuffer == null;
649         }
650     }
651 
652     private static IOException translateForConnected(NativeIoException e) {
653         // We need to correctly translate connect errors to match NIO behaviour.
654         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
655             PortUnreachableException error = new PortUnreachableException(e.getMessage());
656             error.initCause(e);
657             return error;
658         }
659         return e;
660     }
661 
662     /**
663      * Returns {@code true} if the usage of {@link io.netty.channel.unix.SegmentedDatagramPacket} is supported.
664      *
665      * @return {@code true} if supported, {@code false} otherwise.
666      */
667     public static boolean isSegmentedDatagramPacketSupported() {
668         return IoUring.isAvailable();
669     }
670 
671     @Override
672     protected void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
673         if (numOutstandingReads > 0) {
674             int canceled = cancel(registration, Native.IORING_OP_RECVMSG, recvmsgHdrs);
675             assert canceled == numOutstandingReads;
676         }
677     }
678 
679     @Override
680     protected void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
681         if (numOutstandingWrites > 0) {
682             int canceled = cancel(registration, Native.IORING_OP_SENDMSG, sendmsgHdrs);
683             assert canceled == numOutstandingWrites;
684         }
685     }
686 
687     private int cancel(IoRegistration registration, byte op, MsgHdrMemoryArray array) {
688         int cancelled = 0;
689         for (int idx = 0; idx < array.length(); idx++) {
690             long id = array.id(idx);
691             if (id == MsgHdrMemoryArray.NO_ID) {
692                 continue;
693             }
694             // Let's try to cancel outstanding op as these might be submitted and waiting for data
695             // (via fastpoll).
696             IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, id, op);
697             registration.submit(ops);
698             cancelled++;
699         }
700         return cancelled;
701     }
702 
703     @Override
704     protected boolean socketIsEmpty(int flags) {
705         return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
706     }
707 
708     @Override
709     boolean isPollInFirst() {
710         return false;
711     }
712 }