View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.channel.IoHandle;
19  import io.netty.channel.IoHandler;
20  import io.netty.channel.IoHandlerContext;
21  import io.netty.channel.IoHandlerFactory;
22  import io.netty.channel.IoOps;
23  import io.netty.channel.IoRegistration;
24  import io.netty.channel.unix.Buffer;
25  import io.netty.channel.unix.Errors;
26  import io.netty.channel.unix.FileDescriptor;
27  import io.netty.channel.unix.IovArray;
28  import io.netty.util.collection.IntObjectHashMap;
29  import io.netty.util.collection.IntObjectMap;
30  import io.netty.util.concurrent.ThreadAwareExecutor;
31  import io.netty.util.internal.CleanableDirectBuffer;
32  import io.netty.util.internal.ObjectUtil;
33  import io.netty.util.internal.StringUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.io.IOException;
38  import java.io.UncheckedIOException;
39  import java.nio.ByteBuffer;
40  import java.util.ArrayList;
41  import java.util.Collection;
42  import java.util.List;
43  import java.util.concurrent.TimeUnit;
44  import java.util.concurrent.atomic.AtomicBoolean;
45  
46  import static java.lang.Math.max;
47  import static java.lang.Math.min;
48  import static java.util.Objects.requireNonNull;
49  
50  /**
51   * {@link IoHandler} which is implemented in terms of the Linux-specific {@code io_uring} API.
52   */
53  public final class IoUringIoHandler implements IoHandler {
54      private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
55  
56      private final RingBuffer ringBuffer;
57      private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
58      private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
59      // The maximum number of bytes for an InetAddress / Inet6Address
60      private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
61      private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
62  
63      private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
64      private final FileDescriptor eventfd;
65      private final CleanableDirectBuffer eventfdReadBufCleanable;
66      private final ByteBuffer eventfdReadBuf;
67      private final long eventfdReadBufAddress;
68      private final CleanableDirectBuffer timeoutMemoryCleanable;
69      private final ByteBuffer timeoutMemory;
70      private final long timeoutMemoryAddress;
71      private final IovArray iovArray;
72      private final MsgHdrMemoryArray msgHdrMemoryArray;
73      private long eventfdReadSubmitted;
74      private boolean eventFdClosing;
75      private volatile boolean shuttingDown;
76      private boolean closeCompleted;
77      private int nextRegistrationId = Integer.MIN_VALUE;
78  
79      // these two ids are used internally any so can't be used by nextRegistrationId().
80      private static final int EVENTFD_ID = Integer.MAX_VALUE;
81      private static final int RINGFD_ID = EVENTFD_ID - 1;
82      private static final int INVALID_ID = 0;
83  
84      private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
85  
86      private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
87      private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
88  
89      private final ThreadAwareExecutor executor;
90  
91      IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
92          // Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
93          IoUring.ensureAvailability();
94          this.executor = requireNonNull(executor, "executor");
95          requireNonNull(config, "config");
96          int setupFlags = Native.setupFlags(config.singleIssuer());
97  
98          //The default cq size is always twice the ringSize.
99          // It only makes sense when the user actually specifies the cq ring size.
100         int cqSize = 2 * config.getRingSize();
101         if (config.needSetupCqeSize()) {
102             assert IoUring.isSetupCqeSizeSupported();
103             setupFlags |= Native.IORING_SETUP_CQSIZE;
104             cqSize = config.getCqSize();
105         }
106         this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
107         if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
108             int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
109             int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
110             int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
111             if (result < 0) {
112                 // Close ringBuffer before throwing to ensure we release all memory on failure.
113                 ringBuffer.close();
114                 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
115             }
116         }
117 
118         registeredIoUringBufferRing = new IntObjectHashMap<>();
119         Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
120         if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
121             for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
122                 try {
123                     IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
124                     registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
125                 } catch (Errors.NativeIoException e) {
126                     for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
127                         bufferRing.close();
128                     }
129                     // Close ringBuffer before throwing to ensure we release all memory on failure.
130                     ringBuffer.close();
131                     throw new UncheckedIOException(e);
132                 }
133             }
134         }
135 
136         registrations = new IntObjectHashMap<>();
137         eventfd = Native.newBlockingEventFd();
138         eventfdReadBufCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
139         eventfdReadBuf = eventfdReadBufCleanable.buffer();
140         eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
141         timeoutMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(KERNEL_TIMESPEC_SIZE);
142         timeoutMemory = timeoutMemoryCleanable.buffer();
143         timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
144         iovArray = new IovArray(IoUring.NUM_ELEMENTS_IOVEC);
145         msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1024);
146     }
147 
148     @Override
149     public void initialize() {
150         ringBuffer.enable();
151         // Fill all buffer rings now.
152         for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
153             bufferRing.initialize();
154         }
155     }
156 
157     @Override
158     public int run(IoHandlerContext context) {
159         if (closeCompleted) {
160             if (context.shouldReportActiveIoTime()) {
161                 context.reportActiveIoTime(0);
162             }
163             return 0;
164         }
165         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
166         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
167         if (!completionQueue.hasCompletions() && context.canBlock()) {
168             if (eventfdReadSubmitted == 0) {
169                 submitEventFdRead();
170             }
171             long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
172             submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
173         } else {
174             // Even if we have some completions already pending we can still try to even fetch more.
175             submitAndClearNow(submissionQueue);
176         }
177 
178         int processed;
179         if (context.shouldReportActiveIoTime()) {
180             // Timer starts after the blocking wait, around the processing of completions.
181             long activeIoStartTimeNanos = System.nanoTime();
182             processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
183             long activeIoEndTimeNanos = System.nanoTime();
184             context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
185         } else {
186             processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
187         }
188         return processed;
189     }
190 
191     private boolean needSubmit(int sqFlags) {
192         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
193         return submissionQueue.count() > 0
194                 || (sqFlags & (Native.IORING_SQ_CQ_OVERFLOW | Native.IORING_SQ_TASKRUN)) != 0;
195     }
196 
197     private int processCompletionsAndHandleOverflow(SubmissionQueue submissionQueue, CompletionQueue completionQueue,
198                                          CompletionCallback callback) {
199         int processed = 0;
200         // Bound the maximum number of times this will loop before we return and so execute some non IO stuff.
201         // 128 here is just some sort of bound and another number might be ok as well.
202         for (int i = 0; i < 128; i++) {
203             int p = completionQueue.process(callback);
204             int sqFlags = submissionQueue.flags();
205             if ((sqFlags & Native.IORING_SQ_CQ_OVERFLOW) != 0) {
206                 logger.warn("CompletionQueue overflow detected, consider increasing size: {} ",
207                         completionQueue.ringEntries);
208             }
209             if (p == 0) {
210                 if (!needSubmit(sqFlags)) {
211                     break;
212                 }
213                 submitAndClearNow0(submissionQueue);
214             }
215             processed += p;
216         }
217         return processed;
218     }
219 
220     private int submitAndClearNow(SubmissionQueue submissionQueue) {
221         if (needSubmit(submissionQueue.flags())) {
222             return submitAndClearNow0(submissionQueue);
223         }
224         return 0;
225     }
226 
227     private int submitAndClearNow0(SubmissionQueue submissionQueue) {
228 
229         int submitted = submissionQueue.submitAndGetNow();
230 
231         // Clear the iovArray as we can re-use it now as things are considered stable after submission:
232         // See https://man7.org/linux/man-pages/man3/io_uring_prep_sendmsg.3.html
233         iovArray.clear();
234         msgHdrMemoryArray.clear();
235         return submitted;
236     }
237 
238     private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
239             throws Errors.NativeIoException {
240         short bufferRingSize = bufferRingConfig.bufferRingSize();
241         short bufferGroupId = bufferRingConfig.bufferGroupId();
242         int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
243         long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
244         if (ioUringBufRingAddr < 0) {
245             throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
246         }
247         return new IoUringBufferRing(ringFd,
248                 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
249                 bufferRingSize, bufferRingConfig.batchSize(),
250                 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator(),
251                 bufferRingConfig.isBatchAllocation()
252         );
253     }
254 
255     IoUringBufferRing findBufferRing(short bgId) {
256         IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
257         if (cached != null) {
258             return cached;
259         }
260         throw new IllegalArgumentException(
261                 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
262         );
263     }
264 
265     private static void handleLoopException(Throwable throwable) {
266         logger.warn("Unexpected exception in the IO event loop.", throwable);
267 
268         // Prevent possible consecutive immediate failures that lead to
269         // excessive CPU consumption.
270         try {
271             Thread.sleep(100);
272         } catch (InterruptedException ignore) {
273             // ignore
274         }
275     }
276 
277     private void handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
278         try {
279             int id = UserData.decodeId(udata);
280             byte op = UserData.decodeOp(udata);
281             short data = UserData.decodeData(udata);
282 
283             if (logger.isTraceEnabled()) {
284                 logger.trace("completed(ring {}): {}(id={}, res={})",
285                         ringBuffer.fd(), Native.opToStr(op), data, res);
286             }
287             if (id == EVENTFD_ID) {
288                 handleEventFdRead();
289                 return;
290             }
291             if (id == RINGFD_ID) {
292                 // Just return
293                 return;
294             }
295             DefaultIoUringIoRegistration registration = registrations.get(id);
296             if (registration == null) {
297                 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
298                         Native.opToStr(op), id, res);
299                 return;
300             }
301             registration.handle(res, flags, op, data, extraCqeData);
302         } catch (Error e) {
303             throw e;
304         } catch (Throwable throwable) {
305             handleLoopException(throwable);
306         }
307     }
308 
309     private void handleEventFdRead() {
310         eventfdReadSubmitted = 0;
311         if (!eventFdClosing) {
312             eventfdAsyncNotify.set(false);
313             submitEventFdRead();
314         }
315     }
316 
317     private void submitEventFdRead() {
318         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
319         long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
320 
321         eventfdReadSubmitted = submissionQueue.addEventFdRead(
322                 eventfd.intValue(), eventfdReadBufAddress, 0, 8, udata);
323     }
324 
325     private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
326                                          boolean linkTimeout, long timeoutNanoSeconds) {
327         if (timeoutNanoSeconds != -1) {
328             long udata = UserData.encode(RINGFD_ID,
329                     linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
330             // We use the same timespec pointer for all add*Timeout operations. This only works because we call
331             // submit directly after it. This ensures the submitted timeout is considered "stable" and so can be reused.
332             long seconds, nanoSeconds;
333             if (timeoutNanoSeconds == 0) {
334                 seconds = 0;
335                 nanoSeconds = 0;
336             } else {
337                 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
338                 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
339             }
340 
341             timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
342             timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
343             if (linkTimeout) {
344                 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
345             } else {
346                 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
347             }
348         }
349         int submitted = submissionQueue.submitAndGet();
350         // Clear the iovArray as we can re-use it now as things are considered stable after submission:
351         // See https://man7.org/linux/man-pages/man3/io_uring_prep_sendmsg.3.html
352         iovArray.clear();
353         msgHdrMemoryArray.clear();
354         return submitted;
355     }
356 
357     @Override
358     public void prepareToDestroy() {
359         shuttingDown = true;
360         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
361         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
362 
363         List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
364 
365         for (DefaultIoUringIoRegistration registration: copy) {
366             registration.close();
367         }
368 
369         // Write to the eventfd to ensure that if we submitted a read for the eventfd we will see the completion event.
370         Native.eventFdWrite(eventfd.intValue(), 1L);
371 
372         // Ensure all previously submitted IOs get to complete before tearing down everything.
373         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
374         submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
375 
376         // Submit everything and wait until we could drain i.
377         submissionQueue.submitAndGet();
378 
379         while (completionQueue.hasCompletions()) {
380             processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
381             if (submissionQueue.count() > 0) {
382                 submissionQueue.submitAndGetNow();
383             }
384         }
385     }
386 
387     @Override
388     public void destroy() {
389         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
390         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
391         drainEventFd();
392         if (submissionQueue.remaining() < 2) {
393             // We need to submit 2 linked operations. Since they are linked, we cannot allow a submit-call to
394             // separate them. We don't have enough room (< 2) in the queue, so we submit now to make more room.
395             submissionQueue.submit();
396         }
397         // Try to drain all the IO from the queue first...
398         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
399         // We need to also specify the Native.IOSQE_LINK flag for it to work as otherwise it is not correctly linked
400         // with the timeout.
401         // See:
402         // - https://man7.org/linux/man-pages/man2/io_uring_enter.2.html
403         // - https://git.kernel.dk/cgit/liburing/commit/?h=link-timeout&id=bc1bd5e97e2c758d6fd975bd35843b9b2c770c5a
404         submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
405         // ... but only wait for 200 milliseconds on this
406         submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
407         completionQueue.process(this::handle);
408         for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
409             ioUringBufferRing.close();
410         }
411         completeRingClose();
412     }
413 
414     // We need to prevent the race condition where a wakeup event is submitted to a file descriptor that has
415     // already been freed (and potentially reallocated by the OS). Because submitted events is gated on the
416     // `eventfdAsyncNotify` flag we can close the gate but may need to read any outstanding events that have
417     // (or will) be written.
418     private void drainEventFd() {
419         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
420         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
421         assert !eventFdClosing;
422         eventFdClosing = true;
423         boolean eventPending = eventfdAsyncNotify.getAndSet(true);
424         if (eventPending) {
425             // There is an event that has been or will be written by another thread, so we must wait for the event.
426             // Make sure we're actually listening for writes to the event fd.
427             while (eventfdReadSubmitted == 0) {
428                 submitEventFdRead();
429                 submissionQueue.submit();
430             }
431             // Drain the eventfd of the pending wakup.
432             class DrainFdEventCallback implements CompletionCallback {
433                 boolean eventFdDrained;
434 
435                 @Override
436                 public void handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
437                     if (UserData.decodeId(udata) == EVENTFD_ID) {
438                         eventFdDrained = true;
439                     }
440                     IoUringIoHandler.this.handle(res, flags, udata, extraCqeData);
441                 }
442             }
443             final DrainFdEventCallback handler = new DrainFdEventCallback();
444             completionQueue.process(handler);
445             while (!handler.eventFdDrained) {
446                 submissionQueue.submitAndGet();
447                 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, handler);
448             }
449         }
450         // We've consumed any pending eventfd read and `eventfdAsyncNotify` should never
451         // transition back to false, thus we should never have any more events written.
452         // So, if we have a read event pending, we can cancel it.
453         if (eventfdReadSubmitted != 0) {
454             long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
455             submissionQueue.addCancel(eventfdReadSubmitted, udata);
456             eventfdReadSubmitted = 0;
457             submissionQueue.submit();
458         }
459     }
460 
461     private void completeRingClose() {
462         if (closeCompleted) {
463             // already done.
464             return;
465         }
466         closeCompleted = true;
467         ringBuffer.close();
468         try {
469             eventfd.close();
470         } catch (IOException e) {
471             logger.warn("Failed to close eventfd", e);
472         }
473         eventfdReadBufCleanable.clean();
474         timeoutMemoryCleanable.clean();
475         iovArray.release();
476         msgHdrMemoryArray.release();
477     }
478 
479     @Override
480     public IoRegistration register(IoHandle handle) throws Exception {
481         IoUringIoHandle ioHandle = cast(handle);
482         if (shuttingDown) {
483             throw new IllegalStateException("IoUringIoHandler is shutting down");
484         }
485         DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
486         for (;;) {
487             int id = nextRegistrationId();
488             DefaultIoUringIoRegistration old = registrations.put(id, registration);
489             if (old != null) {
490                 assert old.handle != registration.handle;
491                 registrations.put(id, old);
492             } else {
493                 registration.setId(id);
494                 ioHandle.registered();
495                 break;
496             }
497         }
498 
499         return registration;
500     }
501 
502     private int nextRegistrationId() {
503         int id;
504         do {
505             id = nextRegistrationId++;
506         } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
507         return id;
508     }
509 
510     private final class DefaultIoUringIoRegistration implements IoRegistration {
511         private final AtomicBoolean canceled = new AtomicBoolean();
512         private final ThreadAwareExecutor executor;
513         private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
514         final IoUringIoHandle handle;
515 
516         private boolean removeLater;
517         private int outstandingCompletions;
518         private int id;
519 
520         DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
521             this.executor = executor;
522             this.handle = handle;
523         }
524 
525         void setId(int id) {
526             this.id = id;
527         }
528 
529         @Override
530         public long submit(IoOps ops) {
531             IoUringIoOps ioOps = (IoUringIoOps) ops;
532             if (!isValid()) {
533                 return INVALID_ID;
534             }
535             if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
536                 // Because we expect at least 1 completion per submission we can't support IOSQE_CQE_SKIP_SUCCESS
537                 // as it will only produce a completion on failure.
538                 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
539             }
540             long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
541             if (executor.isExecutorThread(Thread.currentThread())) {
542                 submit0(ioOps, udata);
543             } else {
544                 executor.execute(() -> submit0(ioOps, udata));
545             }
546             return udata;
547         }
548 
549         private void submit0(IoUringIoOps ioOps, long udata) {
550             ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
551                     ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
552                     ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
553             );
554             outstandingCompletions++;
555         }
556 
557         @SuppressWarnings("unchecked")
558         @Override
559         public <T> T attachment() {
560             return (T) IoUringIoHandler.this;
561         }
562 
563         @Override
564         public boolean isValid() {
565             return !canceled.get();
566         }
567 
568         @Override
569         public boolean cancel() {
570             if (!canceled.compareAndSet(false, true)) {
571                 // Already cancelled.
572                 return false;
573             }
574             if (executor.isExecutorThread(Thread.currentThread())) {
575                 tryRemove();
576             } else {
577                 executor.execute(this::tryRemove);
578             }
579             return true;
580         }
581 
582         private void tryRemove() {
583             if (outstandingCompletions > 0) {
584                 // We have some completions outstanding, we will remove the id <-> registration mapping
585                 // once these are done.
586                 removeLater = true;
587                 return;
588             }
589             remove();
590         }
591 
592         private void remove() {
593             DefaultIoUringIoRegistration old = registrations.remove(id);
594             assert old == this;
595             handle.unregistered();
596         }
597 
598         void close() {
599             // Closing the handle will also cancel the registration.
600             // It's important that we not manually cancel as close() might need to submit some work to the ring.
601             assert executor.isExecutorThread(Thread.currentThread());
602             try {
603                 handle.close();
604             } catch (Exception e) {
605                 logger.debug("Exception during closing " + handle, e);
606             }
607         }
608 
609         void handle(int res, int flags, byte op, short data, ByteBuffer extraCqeData) {
610             event.update(res, flags, op, data, extraCqeData);
611             handle.handle(this, event);
612             // Only decrement outstandingCompletions if IORING_CQE_F_MORE is not set as otherwise we know that we will
613             // receive more completions for the intial request.
614             if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
615                 // No more outstanding completions, remove the fd <-> registration mapping now.
616                 removeLater = false;
617                 remove();
618             }
619         }
620     }
621 
622     private static IoUringIoHandle cast(IoHandle handle) {
623         if (handle instanceof IoUringIoHandle) {
624             return (IoUringIoHandle) handle;
625         }
626         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
627     }
628 
629     @Override
630     public void wakeup() {
631         if (!executor.isExecutorThread(Thread.currentThread()) &&
632                 !eventfdAsyncNotify.getAndSet(true)) {
633             // write to the eventfd which will then trigger an eventfd read completion.
634             Native.eventFdWrite(eventfd.intValue(), 1L);
635         }
636     }
637 
638     @Override
639     public boolean isCompatible(Class<? extends IoHandle> handleType) {
640         return IoUringIoHandle.class.isAssignableFrom(handleType);
641     }
642 
643     IovArray iovArray() {
644         if (iovArray.isFull()) {
645             // Submit so we can reuse the iovArray.
646             submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
647         }
648         assert iovArray.count() == 0;
649         return iovArray;
650     }
651 
652     MsgHdrMemoryArray msgHdrMemoryArray() {
653         if (msgHdrMemoryArray.isFull()) {
654             // Submit so we can reuse the msgHdrArray.
655             submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
656         }
657         return msgHdrMemoryArray;
658     }
659 
660     /**
661      * {@code byte[]} that can be used as temporary storage to encode the ipv4 address
662      */
663     byte[] inet4AddressArray() {
664         return inet4AddressArray;
665     }
666 
667     /**
668      * {@code byte[]} that can be used as temporary storage to encode the ipv6 address
669      */
670     byte[] inet6AddressArray() {
671         return inet6AddressArray;
672     }
673 
674     /**
675      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
676      *
677      * @return factory
678      */
679     public static IoHandlerFactory newFactory() {
680         return newFactory(new IoUringIoHandlerConfig());
681     }
682 
683     /**
684      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
685      * Each {@link IoUringIoHandler} will use a ring of size {@code ringSize}.
686      *
687      * @param  ringSize     the size of the ring.
688      * @return              factory
689      */
690     public static IoHandlerFactory newFactory(int ringSize) {
691         IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
692         configuration.setRingSize(ringSize);
693         return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
694     }
695 
696     /**
697      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
698      * Each {@link IoUringIoHandler} will use same option
699      * @param config the io_uring configuration
700      * @return factory
701      */
702     public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
703         IoUring.ensureAvailability();
704         final IoUringIoHandlerConfig copy = ObjectUtil.checkNotNull(config, "config").verifyAndClone();
705         return new IoHandlerFactory() {
706             @Override
707             public IoHandler newHandler(ThreadAwareExecutor eventLoop) {
708                 return new IoUringIoHandler(eventLoop, copy);
709             }
710 
711             @Override
712             public boolean isChangingThreadSupported() {
713                 return !copy.singleIssuer();
714             }
715         };
716     }
717 }