View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package;
18  import io.netty.buffer.ByteBuf;
19  import;
20  import;
21  import;
22  import;
23  import;
24  import;
25  import;
26  import;
27  import;
28  import;
29  import;
30  import;
31  import;
32  import;
33  import;
34  import io.netty.util.UncheckedBooleanSupplier;
35  import io.netty.util.internal.ObjectUtil;
36  import io.netty.util.internal.StringUtil;
38  import;
39  import;
40  import;
41  import;
42  import;
43  import;
44  import;
45  import java.nio.channels.UnresolvedAddressException;
47  import static;
49  public final class IoUringDatagramChannel extends AbstractIoUringChannel implements DatagramChannel {
50      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
51      private static final String EXPECTED_TYPES =
52              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
53              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
54              StringUtil.simpleClassName(ByteBuf.class) + ", " +
55              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
56              StringUtil.simpleClassName(ByteBuf.class) + ')';
58      private final IoUringDatagramChannelConfig config;
59      private volatile boolean connected;
61      // These buffers are used for msghdr, iov, sockaddr_in / sockaddr_in6 when doing recvmsg / sendmsg
62      //
63      // TODO: Alternative we could also allocate these everytime from the ByteBufAllocator or we could use
64      //       some sort of other pool. Let's keep it simple for now.
65      //
66      // Consider exposing some configuration for that.
67      private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray((short) 256);
68      private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray((short) 256);
69      private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
71      /**
72       * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
73       * on the Operation Systems default which will be chosen.
74       */
75      public IoUringDatagramChannel() {
76          this(null);
77      }
79      /**
80       * Create a new instance using the given {@link SocketProtocolFamily}. If {@code null} is used it will depend
81       * on the Operation Systems default which will be chosen.
82       */
83      public IoUringDatagramChannel(SocketProtocolFamily family) {
84          this(LinuxSocket.newSocketDgram(useIpv6(family)), false);
85      }
87      private static boolean useIpv6(SocketProtocolFamily family) {
88          if (family == null) {
89              return Socket.isIPv6Preferred();
90          }
91          return family == SocketProtocolFamily.INET6;
92      }
94      /**
95       * Create a new instance which selects the {@link SocketProtocolFamily} to use depending
96       * on the Operation Systems default which will be chosen.
97       */
98      public IoUringDatagramChannel(int fd) {
99          this(new LinuxSocket(fd), true);
100     }
102     private IoUringDatagramChannel(LinuxSocket fd, boolean active) {
103         // Always use a blocking fd and so make use of fast-poll.
104         super(null, fd, active);
105         config = new IoUringDatagramChannelConfig(this);
106     }
108     @Override
109     public InetSocketAddress remoteAddress() {
110         return (InetSocketAddress) super.remoteAddress();
111     }
113     @Override
114     public InetSocketAddress localAddress() {
115         return (InetSocketAddress) super.localAddress();
116     }
118     @Override
119     public ChannelMetadata metadata() {
120         return METADATA;
121     }
123     @Override
124     public boolean isActive() {
125         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || super.isActive());
126     }
128     @Override
129     public boolean isConnected() {
130         return connected;
131     }
133     @Override
134     public ChannelFuture joinGroup(InetAddress multicastAddress) {
135         return joinGroup(multicastAddress, newPromise());
136     }
138     @Override
139     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
140         try {
141             return joinGroup(
142                     multicastAddress,
143                     NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
144         } catch (IOException e) {
145             promise.setFailure(e);
146         }
147         return promise;
148     }
150     @Override
151     public ChannelFuture joinGroup(
152             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
153         return joinGroup(multicastAddress, networkInterface, newPromise());
154     }
156     @Override
157     public ChannelFuture joinGroup(
158             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
159             ChannelPromise promise) {
160         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
161     }
163     @Override
164     public ChannelFuture joinGroup(
165             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
166         return joinGroup(multicastAddress, networkInterface, source, newPromise());
167     }
169     @Override
170     public ChannelFuture joinGroup(
171             final InetAddress multicastAddress, final NetworkInterface networkInterface,
172             final InetAddress source, final ChannelPromise promise) {
174         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
175         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
177         try {
178             socket.joinGroup(multicastAddress, networkInterface, source);
179             promise.setSuccess();
180         } catch (IOException e) {
181             promise.setFailure(e);
182         }
183         return promise;
184     }
186     @Override
187     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
188         return leaveGroup(multicastAddress, newPromise());
189     }
191     @Override
192     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
193         try {
194             return leaveGroup(
195                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
196         } catch (IOException e) {
197             promise.setFailure(e);
198         }
199         return promise;
200     }
202     @Override
203     public ChannelFuture leaveGroup(
204             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
205         return leaveGroup(multicastAddress, networkInterface, newPromise());
206     }
208     @Override
209     public ChannelFuture leaveGroup(
210             InetSocketAddress multicastAddress,
211             NetworkInterface networkInterface, ChannelPromise promise) {
212         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
213     }
215     @Override
216     public ChannelFuture leaveGroup(
217             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
218         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
219     }
221     @Override
222     public ChannelFuture leaveGroup(
223             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
224             final ChannelPromise promise) {
225         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
226         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
228         try {
229             socket.leaveGroup(multicastAddress, networkInterface, source);
230             promise.setSuccess();
231         } catch (IOException e) {
232             promise.setFailure(e);
233         }
234         return promise;
235     }
237     @Override
238     public ChannelFuture block(
239             InetAddress multicastAddress, NetworkInterface networkInterface,
240             InetAddress sourceToBlock) {
241         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
242     }
244     @Override
245     public ChannelFuture block(
246             final InetAddress multicastAddress, final NetworkInterface networkInterface,
247             final InetAddress sourceToBlock, final ChannelPromise promise) {
248         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
249         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
250         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
252         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
253         return promise;
254     }
256     @Override
257     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
258         return block(multicastAddress, sourceToBlock, newPromise());
259     }
261     @Override
262     public ChannelFuture block(
263             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
264         try {
265             return block(
266                     multicastAddress,
267                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
268                     sourceToBlock, promise);
269         } catch (Throwable e) {
270             promise.setFailure(e);
271         }
272         return promise;
273     }
275     @Override
276     protected AbstractUnsafe newUnsafe() {
277         return new IoUringDatagramChannelUnsafe();
278     }
280     @Override
281     protected void doBind(SocketAddress localAddress) throws Exception {
282         if (localAddress instanceof InetSocketAddress) {
283             InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
284             if (socketAddress.getAddress().isAnyLocalAddress() &&
285                     socketAddress.getAddress() instanceof Inet4Address) {
286                 if ( == SocketProtocolFamily.INET6) {
287                     localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
288                 }
289             }
290         }
291         super.doBind(localAddress);
292         active = true;
293     }
295     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
296         if (envelope.recipient() instanceof InetSocketAddress
297                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
298             throw new UnresolvedAddressException();
299         }
300     }
302     @Override
303     protected Object filterOutboundMessage(Object msg) {
304         if (msg instanceof DatagramPacket) {
305             DatagramPacket packet = (DatagramPacket) msg;
306             checkUnresolved(packet);
307             ByteBuf content = packet.content();
308             return !content.hasMemoryAddress() ?
309                     packet.replace(newDirectBuffer(packet, content)) : msg;
310         }
312         if (msg instanceof ByteBuf) {
313             ByteBuf buf = (ByteBuf) msg;
314             return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
315         }
317         if (msg instanceof AddressedEnvelope) {
318             @SuppressWarnings("unchecked")
319             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
320             checkUnresolved(e);
321             if (e.content() instanceof ByteBuf &&
322                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
324                 ByteBuf content = (ByteBuf) e.content();
325                 return !content.hasMemoryAddress()?
326                         new DefaultAddressedEnvelope<>(
327                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
328             }
329         }
331         throw new UnsupportedOperationException(
332                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
333     }
335     @Override
336     public DatagramChannelConfig config() {
337         return config;
338     }
340     @Override
341     protected void doDisconnect() throws Exception {
342         // TODO: use io_uring for this too...
343         socket.disconnect();
344         connected = active = false;
346         resetCachedAddresses();
347     }
349     @Override
350     protected void doClose() throws Exception {
351         super.doClose();
352         connected = false;
353     }
355     private final class IoUringDatagramChannelUnsafe extends AbstractUringUnsafe {
356         private final WriteProcessor writeProcessor = new WriteProcessor();
358         private ByteBuf readBuffer;
360         private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
361             private int written;
362             @Override
363             public boolean processMessage(Object msg) {
364                 if (scheduleWrite(msg, written == 0)) {
365                     written++;
366                     return true;
367                 }
368                 return false;
369             }
371             int write(ChannelOutboundBuffer in) {
372                 written = 0;
373                 try {
374                     in.forEachFlushedMessage(this);
375                 } catch (Exception e) {
376                     // This should never happen as our processMessage(...) never throws.
377                     throw new IllegalStateException(e);
378                 }
379                 return written;
380             }
381         }
383         @Override
384         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
385             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
386             final ChannelPipeline pipeline = pipeline();
387             ByteBuf byteBuf = this.readBuffer;
388             assert byteBuf != null;
389             try {
390                 recvmsgComplete(pipeline, allocHandle, byteBuf, res, flags, data, outstanding);
391             } catch (Throwable t) {
392                 if (connected && t instanceof NativeIoException) {
393                     t = translateForConnected((NativeIoException) t);
394                 }
395                 pipeline.fireExceptionCaught(t);
396             }
397         }
399         private void recvmsgComplete(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
400                                       ByteBuf byteBuf, int res, int flags, int idx, int outstanding)
401                 throws IOException {
402             MsgHdrMemory hdr = recvmsgHdrs.hdr(idx);
403             if (res < 0) {
404                 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
405                     // If res is negative we should pass it to ioResult(...) which will either throw
406                     // or convert it to 0 if we could not read because the socket was not readable.
407                     allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
408                 }
409             } else {
410                 allocHandle.lastBytesRead(res);
411                 if (hdr.hasPort(IoUringDatagramChannel.this)) {
412                     allocHandle.incMessagesRead(1);
413                     DatagramPacket packet =
414                             IoUringDatagramChannel.this, (IoUringIoHandler) registration().ioHandler(), byteBuf, res);
415                     pipeline.fireChannelRead(packet);
416                 }
417             }
419             // Reset the id as this read was completed and so don't need to be cancelled later.
420             recvmsgHdrs.setId(idx, MsgHdrMemoryArray.NO_ID);
421             if (outstanding == 0) {
422                 // There are no outstanding completion events, release the readBuffer and see if we need to schedule
423                 // another one or if the user will do it.
424                 this.readBuffer.release();
425                 this.readBuffer = null;
426                 recvmsgHdrs.clear();
428                 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
429                     if (allocHandle.lastBytesRead() > 0 &&
430                             allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER) &&
431                             // If IORING_CQE_F_SOCK_NONEMPTY is supported we should check for it first before
432                             // trying to schedule a read. If it's supported and not part of the flags we know for sure
433                             // that the next read (which would be using Native.MSG_DONTWAIT) will complete without
434                             // be able to read any data. This is useless work and we can skip it.
435                             (!IoUring.isIOUringCqeFSockNonEmptySupported() ||
436                                     (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0)) {
437                         // Let's schedule another read.
438                         scheduleRead(false);
439                     } else {
440                         // the read was completed with EAGAIN.
441                         allocHandle.readComplete();
442                         pipeline.fireChannelReadComplete();
443                     }
444                 }
445             }
446         }
448         @Override
449         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
450             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
451             ByteBuf byteBuf = allocHandle.allocate(alloc());
452             assert readBuffer == null;
453             readBuffer = byteBuf;
455             int writable = byteBuf.writableBytes();
456             allocHandle.attemptedBytesRead(writable);
457             int datagramSize = ((IoUringDatagramChannelConfig) config()).getMaxDatagramPayloadSize();
459             int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
461             int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
462             if (scheduled == 0) {
463                 // We could not schedule any recvmmsg so we need to release the buffer as there will be no
464                 // completion event.
465                 readBuffer = null;
466                 byteBuf.release();
467             }
468             return scheduled;
469         }
471         private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
472             int writable = byteBuf.writableBytes();
473             long bufferAddress = byteBuf.memoryAddress() + byteBuf.writerIndex();
474             if (numDatagram <= 1) {
475                 return scheduleRecvmsg0(bufferAddress, writable, true) ? 1 : 0;
476             }
477             int i = 0;
478             // Add multiple IORING_OP_RECVMSG to the submission queue. This basically emulates recvmmsg(...)
479             for (; i < numDatagram && writable >= datagramSize; i++) {
480                 if (!scheduleRecvmsg0(bufferAddress, datagramSize, i == 0)) {
481                     break;
482                 }
483                 bufferAddress += datagramSize;
484                 writable -= datagramSize;
485             }
486             return i;
487         }
489         private boolean scheduleRecvmsg0(long bufferAddress, int bufferLength, boolean first) {
490             MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
491             if (msgHdrMemory == null) {
492                 // We can not continue reading before we did not submit the recvmsg(s) and received the results.
493                 return false;
494             }
495             msgHdrMemory.write(socket, null, bufferAddress, bufferLength, (short) 0);
497             int fd = fd().intValue();
498             int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
499             IoUringIoRegistration registration = registration();
500             // We always use idx here so we can detect if no idx was used by checking if data < 0 in
501             // readComplete0(...)
502             IoUringIoOps ops = IoUringIoOps.newRecvmsg(
503                     fd, flags((byte) 0), msgFlags, msgHdrMemory.address(), msgHdrMemory.idx());
504             long id = registration.submit(ops);
505             if (id == 0) {
506                 // Submission failed we don't used the MsgHdrMemory and so should give it back.
507                 recvmsgHdrs.restoreNextHdr(msgHdrMemory);
508                 return false;
509             }
510             recvmsgHdrs.setId(msgHdrMemory.idx(), id);
511             return true;
512         }
514         @Override
515         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
516             ChannelOutboundBuffer outboundBuffer = outboundBuffer();
518             // Reset the id as this write was completed and so don't need to be cancelled later.
519             sendmsgHdrs.setId(data, MsgHdrMemoryArray.NO_ID);
520             sendmsgResArray[data] = res;
521             // Store the result so we can handle it as soon as we have no outstanding writes anymore.
522             if (outstanding == 0) {
523                 // All writes are done as part of a batch. Let's remove these from the ChannelOutboundBuffer
524                 boolean writtenSomething = false;
525                 int numWritten = sendmsgHdrs.length();
526                 sendmsgHdrs.clear();
527                 for (int i = 0; i < numWritten; i++) {
528                     writtenSomething |= removeFromOutboundBuffer(
529                             outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
530                 }
531                 return writtenSomething;
532             }
533             return true;
534         }
536         private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
537             if (res >= 0) {
538                 // When using Datagram we should consider the message written as long as res is not negative.
539                 return outboundBuffer.remove();
540             }
541             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
542                 return false;
543             }
544             try {
545                 return ioResult(errormsg, res) != 0;
546             } catch (Throwable cause) {
547                 return outboundBuffer.remove(cause);
548             }
549         }
551         @Override
552         void connectComplete(byte op, int res, int flags, short data) {
553             if (res >= 0) {
554                 connected = true;
555             }
556             super.connectComplete(op, res, flags, data);
557         }
559         @Override
560         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
561             return writeProcessor.write(in);
562         }
564         @Override
565         protected int scheduleWriteSingle(Object msg) {
566             return scheduleWrite(msg, true) ? 1 : 0;
567         }
569         private boolean scheduleWrite(Object msg, boolean first) {
570             final ByteBuf data;
571             final InetSocketAddress remoteAddress;
572             final int segmentSize;
573             if (msg instanceof AddressedEnvelope) {
574                 @SuppressWarnings("unchecked")
575                 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
576                         (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
577                 data = envelope.content();
578                 remoteAddress = envelope.recipient();
579                 if (msg instanceof SegmentedDatagramPacket) {
580                     segmentSize = ((SegmentedDatagramPacket) msg).segmentSize();
581                 } else {
582                     segmentSize = 0;
583                 }
584             } else {
585                 data = (ByteBuf) msg;
586                 remoteAddress = (InetSocketAddress) remoteAddress();
587                 segmentSize = 0;
588             }
590             long bufferAddress = data.memoryAddress();
591             return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes(), segmentSize, first);
592         }
594         private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress,
595                                         int bufferLength, int segmentSize, boolean first) {
596             MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
597             if (hdr == null) {
598                 // There is no MsgHdrMemory left to use. We need to submit and wait for the writes to complete
599                 // before we can write again.
600                 return false;
601             }
602             hdr.write(socket, remoteAddress, bufferAddress, bufferLength, (short) segmentSize);
604             int fd = fd().intValue();
605             int msgFlags = first ? 0 : Native.MSG_DONTWAIT;
606             IoUringIoRegistration registration = registration();
607             IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, flags((byte) 0), msgFlags, hdr.address(), hdr.idx());
608             long id = registration.submit(ops);
609             if (id == 0) {
610                 // Submission failed we don't used the MsgHdrMemory and so should give it back.
611                 sendmsgHdrs.restoreNextHdr(hdr);
612                 return false;
613             }
614             sendmsgHdrs.setId(hdr.idx(), id);
615             return true;
616         }
618         @Override
619         protected void freeResourcesNow(IoUringIoRegistration reg) {
620             sendmsgHdrs.release();
621             recvmsgHdrs.release();
622             super.freeResourcesNow(reg);
623         }
624     }
626     private static IOException translateForConnected(NativeIoException e) {
627         // We need to correctly translate connect errors to match NIO behaviour.
628         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
629             PortUnreachableException error = new PortUnreachableException(e.getMessage());
630             error.initCause(e);
631             return error;
632         }
633         return e;
634     }
636     /**
637      * Returns {@code true} if the usage of {@link} is supported.
638      *
639      * @return {@code true} if supported, {@code false} otherwise.
640      */
641     public static boolean isSegmentedDatagramPacketSupported() {
642         return IoUring.isAvailable();
643     }
645     @Override
646     protected void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads) {
647         if (numOutstandingReads > 0) {
648             int canceled = cancel(registration, Native.IORING_OP_RECVMSG, recvmsgHdrs);
649             assert canceled == numOutstandingReads;
650         }
651     }
653     @Override
654     protected void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites) {
655         if (numOutstandingWrites > 0) {
656             int canceled = cancel(registration, Native.IORING_OP_SENDMSG, sendmsgHdrs);
657             assert canceled == numOutstandingWrites;
658         }
659     }
661     private int cancel(IoUringIoRegistration registration, byte op, MsgHdrMemoryArray array) {
662         int cancelled = 0;
663         int fd = fd().intValue();
664         for (int idx = 0; idx < array.length(); idx++) {
665             long id =;
666             if (id == MsgHdrMemoryArray.NO_ID) {
667                 continue;
668             }
669             // Let's try to cancel outstanding op as these might be submitted and waiting for data
670             // (via fastpoll).
671             IoUringIoOps ops = IoUringIoOps.newAsyncCancel(fd, flags((byte) 0), id, op);
672             registration.submit(ops);
673             cancelled++;
674         }
675         return cancelled;
676     }
678     @Override
679     protected boolean socketIsEmpty(int flags) {
680         return IoUring.isIOUringCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
681     }
682 }