View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelMetadata;
22  import io.netty.channel.ChannelOutboundBuffer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.DefaultAddressedEnvelope;
26  import io.netty.channel.socket.DatagramChannel;
27  import io.netty.channel.socket.DatagramChannelConfig;
28  import io.netty.channel.socket.DatagramPacket;
29  import io.netty.channel.socket.SocketProtocolFamily;
30  import io.netty.channel.unix.Errors;
31  import io.netty.channel.unix.Errors.NativeIoException;
32  import io.netty.channel.unix.SegmentedDatagramPacket;
33  import io.netty.channel.unix.Socket;
34  import io.netty.util.UncheckedBooleanSupplier;
35  import io.netty.util.internal.ObjectUtil;
36  import io.netty.util.internal.StringUtil;
37  
38  import java.io.IOException;
39  import java.net.Inet4Address;
40  import java.net.InetAddress;
41  import java.net.InetSocketAddress;
42  import java.net.NetworkInterface;
43  import java.net.PortUnreachableException;
44  import java.net.SocketAddress;
45  import java.nio.channels.UnresolvedAddressException;
46  
47  import static io.netty.channel.unix.Errors.ioResult;
48  
49  public final class IoUringDatagramChannel extends AbstractIoUringChannel implements DatagramChannel {
50      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
51      private static final String EXPECTED_TYPES =
52              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
53              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
54              StringUtil.simpleClassName(ByteBuf.class) + ", " +
55              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
56              StringUtil.simpleClassName(ByteBuf.class) + ')';
57  
58      private final IoUringDatagramChannelConfig config;
59      private volatile boolean connected;
60  
61      // These buffers are used for msghdr, iov, sockaddr_in / sockaddr_in6 when doing recvmsg / sendmsg
62      //
63      // TODO: Alternative we could also allocate these everytime from the ByteBufAllocator or we could use
64      //       some sort of other pool. Let's keep it simple for now.
65      //
66      // Consider exposing some configuration for that.
67      private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray((short) 256);
68      private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray((short) 256);
69      private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
70  
71      /**
72       * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
73       * on the Operation Systems default which will be chosen.
74       */
75      public IoUringDatagramChannel() {
76          this(null);
77      }
78  
79      /**
80       * Create a new instance using the given {@link SocketProtocolFamily}. If {@code null} is used it will depend
81       * on the Operation Systems default which will be chosen.
82       */
83      public IoUringDatagramChannel(SocketProtocolFamily family) {
84          this(LinuxSocket.newSocketDgram(useIpv6(family)), false);
85      }
86  
87      private static boolean useIpv6(SocketProtocolFamily family) {
88          if (family == null) {
89              return Socket.isIPv6Preferred();
90          }
91          return family == SocketProtocolFamily.INET6;
92      }
93  
94      /**
95       * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
96       * on the Operation Systems default which will be chosen.
97       */
98      public IoUringDatagramChannel(int fd) {
99          this(new LinuxSocket(fd), true);
100     }
101 
102     private IoUringDatagramChannel(LinuxSocket fd, boolean active) {
103         // Always use a blocking fd and so make use of fast-poll.
104         super(null, LinuxSocket.makeBlocking(fd), active);
105         config = new IoUringDatagramChannelConfig(this);
106     }
107 
108     @Override
109     public InetSocketAddress remoteAddress() {
110         return (InetSocketAddress) super.remoteAddress();
111     }
112 
113     @Override
114     public InetSocketAddress localAddress() {
115         return (InetSocketAddress) super.localAddress();
116     }
117 
118     @Override
119     public ChannelMetadata metadata() {
120         return METADATA;
121     }
122 
123     @Override
124     public boolean isActive() {
125         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || super.isActive());
126     }
127 
128     @Override
129     public boolean isConnected() {
130         return connected;
131     }
132 
133     @Override
134     public ChannelFuture joinGroup(InetAddress multicastAddress) {
135         return joinGroup(multicastAddress, newPromise());
136     }
137 
138     @Override
139     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
140         try {
141             return joinGroup(
142                     multicastAddress,
143                     NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
144         } catch (IOException e) {
145             promise.setFailure(e);
146         }
147         return promise;
148     }
149 
150     @Override
151     public ChannelFuture joinGroup(
152             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
153         return joinGroup(multicastAddress, networkInterface, newPromise());
154     }
155 
156     @Override
157     public ChannelFuture joinGroup(
158             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
159             ChannelPromise promise) {
160         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
161     }
162 
163     @Override
164     public ChannelFuture joinGroup(
165             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
166         return joinGroup(multicastAddress, networkInterface, source, newPromise());
167     }
168 
169     @Override
170     public ChannelFuture joinGroup(
171             final InetAddress multicastAddress, final NetworkInterface networkInterface,
172             final InetAddress source, final ChannelPromise promise) {
173 
174         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
175         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
176 
177         try {
178             socket.joinGroup(multicastAddress, networkInterface, source);
179             promise.setSuccess();
180         } catch (IOException e) {
181             promise.setFailure(e);
182         }
183         return promise;
184     }
185 
186     @Override
187     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
188         return leaveGroup(multicastAddress, newPromise());
189     }
190 
191     @Override
192     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
193         try {
194             return leaveGroup(
195                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
196         } catch (IOException e) {
197             promise.setFailure(e);
198         }
199         return promise;
200     }
201 
202     @Override
203     public ChannelFuture leaveGroup(
204             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
205         return leaveGroup(multicastAddress, networkInterface, newPromise());
206     }
207 
208     @Override
209     public ChannelFuture leaveGroup(
210             InetSocketAddress multicastAddress,
211             NetworkInterface networkInterface, ChannelPromise promise) {
212         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
213     }
214 
215     @Override
216     public ChannelFuture leaveGroup(
217             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
218         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
219     }
220 
221     @Override
222     public ChannelFuture leaveGroup(
223             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
224             final ChannelPromise promise) {
225         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
226         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
227 
228         try {
229             socket.leaveGroup(multicastAddress, networkInterface, source);
230             promise.setSuccess();
231         } catch (IOException e) {
232             promise.setFailure(e);
233         }
234         return promise;
235     }
236 
237     @Override
238     public ChannelFuture block(
239             InetAddress multicastAddress, NetworkInterface networkInterface,
240             InetAddress sourceToBlock) {
241         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
242     }
243 
244     @Override
245     public ChannelFuture block(
246             final InetAddress multicastAddress, final NetworkInterface networkInterface,
247             final InetAddress sourceToBlock, final ChannelPromise promise) {
248         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
249         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
250         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
251 
252         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
253         return promise;
254     }
255 
256     @Override
257     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
258         return block(multicastAddress, sourceToBlock, newPromise());
259     }
260 
261     @Override
262     public ChannelFuture block(
263             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
264         try {
265             return block(
266                     multicastAddress,
267                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
268                     sourceToBlock, promise);
269         } catch (Throwable e) {
270             promise.setFailure(e);
271         }
272         return promise;
273     }
274 
275     @Override
276     protected AbstractUnsafe newUnsafe() {
277         return new IoUringDatagramChannelUnsafe();
278     }
279 
280     @Override
281     protected void doBind(SocketAddress localAddress) throws Exception {
282         if (localAddress instanceof InetSocketAddress) {
283             InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
284             if (socketAddress.getAddress().isAnyLocalAddress() &&
285                     socketAddress.getAddress() instanceof Inet4Address) {
286                 if (socket.family() == SocketProtocolFamily.INET6) {
287                     localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
288                 }
289             }
290         }
291         super.doBind(localAddress);
292         active = true;
293     }
294 
295     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
296         if (envelope.recipient() instanceof InetSocketAddress
297                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
298             throw new UnresolvedAddressException();
299         }
300     }
301 
302     @Override
303     protected Object filterOutboundMessage(Object msg) {
304         if (msg instanceof DatagramPacket) {
305             DatagramPacket packet = (DatagramPacket) msg;
306             checkUnresolved(packet);
307             ByteBuf content = packet.content();
308             return !content.hasMemoryAddress() ?
309                     packet.replace(newDirectBuffer(packet, content)) : msg;
310         }
311 
312         if (msg instanceof ByteBuf) {
313             ByteBuf buf = (ByteBuf) msg;
314             return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
315         }
316 
317         if (msg instanceof AddressedEnvelope) {
318             @SuppressWarnings("unchecked")
319             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
320             checkUnresolved(e);
321             if (e.content() instanceof ByteBuf &&
322                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
323 
324                 ByteBuf content = (ByteBuf) e.content();
325                 return !content.hasMemoryAddress()?
326                         new DefaultAddressedEnvelope<>(
327                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
328             }
329         }
330 
331         throw new UnsupportedOperationException(
332                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
333     }
334 
335     @Override
336     public DatagramChannelConfig config() {
337         return config;
338     }
339 
340     @Override
341     protected void doDisconnect() throws Exception {
342         // TODO: use io_uring for this too...
343         socket.disconnect();
344         connected = active = false;
345 
346         resetCachedAddresses();
347     }
348 
349     @Override
350     protected void doClose() throws Exception {
351         super.doClose();
352         connected = false;
353     }
354 
355     private final class IoUringDatagramChannelUnsafe extends AbstractUringUnsafe {
356         private final WriteProcessor writeProcessor = new WriteProcessor();
357 
358         private ByteBuf readBuffer;
359 
360         private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
361             private int written;
362             @Override
363             public boolean processMessage(Object msg) {
364                 if (scheduleWrite(msg, written == 0)) {
365                     written++;
366                     return true;
367                 }
368                 return false;
369             }
370 
371             int write(ChannelOutboundBuffer in) {
372                 written = 0;
373                 try {
374                     in.forEachFlushedMessage(this);
375                 } catch (Exception e) {
376                     // This should never happen as our processMessage(...) never throws.
377                     throw new IllegalStateException(e);
378                 }
379                 return written;
380             }
381         }
382 
383         @Override
384         protected void readComplete0(int res, int flags, int data, int outstanding) {
385             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
386             final ChannelPipeline pipeline = pipeline();
387             ByteBuf byteBuf = this.readBuffer;
388             assert byteBuf != null;
389             try {
390                 recvmsgComplete(pipeline, allocHandle, byteBuf, res, flags, data, outstanding);
391             } catch (Throwable t) {
392                 if (connected && t instanceof NativeIoException) {
393                     t = translateForConnected((NativeIoException) t);
394                 }
395                 pipeline.fireExceptionCaught(t);
396             }
397         }
398 
399         private void recvmsgComplete(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
400                                       ByteBuf byteBuf, int res, int flags, int idx, int outstanding)
401                 throws IOException {
402             MsgHdrMemory hdr = recvmsgHdrs.hdr(idx);
403             if (res < 0) {
404                 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
405                     // If res is negative we should pass it to ioResult(...) which will either throw
406                     // or convert it to 0 if we could not read because the socket was not readable.
407                     allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
408                 }
409             } else {
410                 allocHandle.lastBytesRead(res);
411                 if (hdr.hasPort(IoUringDatagramChannel.this)) {
412                     allocHandle.incMessagesRead(1);
413                     DatagramPacket packet = hdr.read(
414                             IoUringDatagramChannel.this, (IoUringIoHandler) registration().ioHandler(), byteBuf, res);
415                     pipeline.fireChannelRead(packet);
416                 }
417             }
418 
419             // Reset the id as this read was completed and so don't need to be cancelled later.
420             recvmsgHdrs.setId(idx, -1);
421             if (outstanding == 0) {
422                 // There are no outstanding completion events, release the readBuffer and see if we need to schedule
423                 // another one or if the user will do it.
424                 this.readBuffer.release();
425                 this.readBuffer = null;
426                 recvmsgHdrs.clear();
427 
428                 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
429                     if (allocHandle.lastBytesRead() > 0 &&
430                             allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER) &&
431                             // If IORING_CQE_F_SOCK_NONEMPTY is supported we should check for it first before
432                             // trying to schedule a read. If it's supported and not part of the flags we know for sure
433                             // that the next read (which would be using Native.MSG_DONTWAIT) will complete without
434                             // be able to read any data. This is useless work and we can skip it.
435                             (!IoUring.isIOUringCqeFSockNonEmptySupported() ||
436                                     (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0)) {
437                         // Let's schedule another read.
438                         scheduleRead(false);
439                     } else {
440                         // the read was completed with EAGAIN.
441                         allocHandle.readComplete();
442                         pipeline.fireChannelReadComplete();
443                     }
444                 }
445             }
446         }
447 
448         @Override
449         protected int scheduleRead0(boolean first) {
450             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
451             ByteBuf byteBuf = allocHandle.allocate(alloc());
452             assert readBuffer == null;
453             readBuffer = byteBuf;
454 
455             int writable = byteBuf.writableBytes();
456             allocHandle.attemptedBytesRead(writable);
457             int datagramSize = ((IoUringDatagramChannelConfig) config()).getMaxDatagramPayloadSize();
458 
459             int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
460 
461             int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
462             if (scheduled == 0) {
463                 // We could not schedule any recvmmsg so we need to release the buffer as there will be no
464                 // completion event.
465                 readBuffer = null;
466                 byteBuf.release();
467             }
468             return scheduled;
469         }
470 
471         private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
472             int writable = byteBuf.writableBytes();
473             long bufferAddress = byteBuf.memoryAddress() + byteBuf.writerIndex();
474             if (numDatagram <= 1) {
475                 return scheduleRecvmsg0(bufferAddress, writable, true) ? 1 : 0;
476             }
477             int i = 0;
478             // Add multiple IORING_OP_RECVMSG to the submission queue. This basically emulates recvmmsg(...)
479             for (; i < numDatagram && writable >= datagramSize; i++) {
480                 if (!scheduleRecvmsg0(bufferAddress, datagramSize, i == 0)) {
481                     break;
482                 }
483                 bufferAddress += datagramSize;
484                 writable -= datagramSize;
485             }
486             return i;
487         }
488 
489         private boolean scheduleRecvmsg0(long bufferAddress, int bufferLength, boolean first) {
490             MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
491             if (msgHdrMemory == null) {
492                 // We can not continue reading before we did not submit the recvmsg(s) and received the results.
493                 return false;
494             }
495             msgHdrMemory.write(socket, null, bufferAddress, bufferLength, (short) 0);
496 
497             int fd = fd().intValue();
498             int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
499             IoUringIoRegistration registration = registration();
500             // We always use idx here so we can detect if no idx was used by checking if data < 0 in
501             // readComplete0(...)
502             IoUringIoOps ops = IoUringIoOps.newRecvmsg(fd, 0, msgFlags, msgHdrMemory.address(), msgHdrMemory.idx());
503             long id = registration.submit(ops);
504             recvmsgHdrs.setId(msgHdrMemory.idx(), id);
505             return true;
506         }
507 
508         @Override
509         boolean writeComplete0(int res, int flags, int data, int outstanding) {
510             ChannelOutboundBuffer outboundBuffer = outboundBuffer();
511 
512             // Reset the id as this write was completed and so don't need to be cancelled later.
513             sendmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
514             sendmsgResArray[data] = res;
515             // Store the result so we can handle it as soon as we have no outstanding writes anymore.
516             if (outstanding == 0) {
517                 // All writes are done as part of a batch. Let's remove these from the ChannelOutboundBuffer
518                 boolean writtenSomething = false;
519                 int numWritten = sendmsgHdrs.length();
520                 sendmsgHdrs.clear();
521                 for (int i = 0; i < numWritten; i++) {
522                     writtenSomething |= removeFromOutboundBuffer(
523                             outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
524                 }
525                 return writtenSomething;
526             }
527             return true;
528         }
529 
530         private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
531             if (res >= 0) {
532                 // When using Datagram we should consider the message written as long as res is not negative.
533                 return outboundBuffer.remove();
534             }
535             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
536                 return false;
537             }
538             try {
539                 return ioResult(errormsg, res) != 0;
540             } catch (Throwable cause) {
541                 return outboundBuffer.remove(cause);
542             }
543         }
544 
545         @Override
546         void connectComplete(int res, int flags, short data) {
547             if (res >= 0) {
548                 connected = true;
549             }
550             super.connectComplete(res, flags, data);
551         }
552 
553         @Override
554         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
555             return writeProcessor.write(in);
556         }
557 
558         @Override
559         protected int scheduleWriteSingle(Object msg) {
560             return scheduleWrite(msg, true) ? 1 : 0;
561         }
562 
563         private boolean scheduleWrite(Object msg, boolean first) {
564             final ByteBuf data;
565             final InetSocketAddress remoteAddress;
566             final int segmentSize;
567             if (msg instanceof AddressedEnvelope) {
568                 @SuppressWarnings("unchecked")
569                 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
570                         (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
571                 data = envelope.content();
572                 remoteAddress = envelope.recipient();
573                 if (msg instanceof SegmentedDatagramPacket) {
574                     segmentSize = ((SegmentedDatagramPacket) msg).segmentSize();
575                 } else {
576                     segmentSize = 0;
577                 }
578             } else {
579                 data = (ByteBuf) msg;
580                 remoteAddress = (InetSocketAddress) remoteAddress();
581                 segmentSize = 0;
582             }
583 
584             long bufferAddress = data.memoryAddress();
585             return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes(), segmentSize, first);
586         }
587 
588         private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress,
589                                         int bufferLength, int segmentSize, boolean first) {
590             MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
591             if (hdr == null) {
592                 // There is no MsgHdrMemory left to use. We need to submit and wait for the writes to complete
593                 // before we can write again.
594                 return false;
595             }
596             hdr.write(socket, remoteAddress, bufferAddress, bufferLength, (short) segmentSize);
597 
598             int fd = fd().intValue();
599             int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
600             IoUringIoRegistration registration = registration();
601             IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, 0, msgFlags, hdr.address(), hdr.idx());
602             long id = registration.submit(ops);
603             sendmsgHdrs.setId(hdr.idx(), id);
604             return true;
605         }
606 
607         @Override
608         protected void freeResourcesNow(IoUringIoRegistration reg) {
609             sendmsgHdrs.release();
610             recvmsgHdrs.release();
611             super.freeResourcesNow(reg);
612         }
613     }
614 
615     private static IOException translateForConnected(NativeIoException e) {
616         // We need to correctly translate connect errors to match NIO behaviour.
617         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
618             PortUnreachableException error = new PortUnreachableException(e.getMessage());
619             error.initCause(e);
620             return error;
621         }
622         return e;
623     }
624 
625     /**
626      * Returns {@code true} if the usage of {@link io.netty.channel.unix.SegmentedDatagramPacket} is supported.
627      *
628      * @return {@code true} if supported, {@code false} otherwise.
629      */
630     public static boolean isSegmentedDatagramPacketSupported() {
631         return IoUring.isAvailable();
632     }
633 
634     @Override
635     protected void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads) {
636         if (numOutstandingReads > 0) {
637             int canceled = cancel(registration, Native.IORING_OP_RECVMSG, recvmsgHdrs);
638             assert canceled == numOutstandingReads;
639         }
640     }
641 
642     @Override
643     protected void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites) {
644         if (numOutstandingWrites > 0) {
645             int canceled = cancel(registration, Native.IORING_OP_SENDMSG, sendmsgHdrs);
646             assert canceled == numOutstandingWrites;
647         }
648     }
649 
650     private int cancel(IoUringIoRegistration registration, byte op, MsgHdrMemoryArray array) {
651         int cancelled = 0;
652         int fd = fd().intValue();
653         for (int idx = 0; idx < array.length(); idx++) {
654             long id = array.id(idx);
655             if (id == MsgHdrMemoryArray.NO_ID) {
656                 continue;
657             }
658             // Let's try to cancel outstanding op as these might be submitted and waiting for data
659             // (via fastpoll).
660             IoUringIoOps ops = IoUringIoOps.newAsyncCancel(fd, 0, id, op);
661             registration.submit(ops);
662             cancelled++;
663         }
664         return cancelled;
665     }
666 }