View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.channel.IoHandle;
19  import io.netty.channel.IoHandler;
20  import io.netty.channel.IoHandlerContext;
21  import io.netty.channel.IoHandlerFactory;
22  import io.netty.channel.IoOps;
23  import io.netty.channel.IoRegistration;
24  import io.netty.channel.unix.Buffer;
25  import io.netty.channel.unix.Errors;
26  import io.netty.channel.unix.FileDescriptor;
27  import io.netty.channel.unix.IovArray;
28  import io.netty.util.collection.IntObjectHashMap;
29  import io.netty.util.collection.IntObjectMap;
30  import io.netty.util.concurrent.ThreadAwareExecutor;
31  import io.netty.util.internal.CleanableDirectBuffer;
32  import io.netty.util.internal.ObjectUtil;
33  import io.netty.util.internal.StringUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.io.IOException;
38  import java.io.UncheckedIOException;
39  import java.nio.ByteBuffer;
40  import java.util.ArrayList;
41  import java.util.Collection;
42  import java.util.List;
43  import java.util.concurrent.TimeUnit;
44  import java.util.concurrent.atomic.AtomicBoolean;
45  
46  import static java.lang.Math.max;
47  import static java.lang.Math.min;
48  import static java.util.Objects.requireNonNull;
49  
50  /**
51   * {@link IoHandler} which is implemented in terms of the Linux-specific {@code io_uring} API.
52   */
53  public final class IoUringIoHandler implements IoHandler {
54      private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
55  
56      private final RingBuffer ringBuffer;
57      private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
58      private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
59      // The maximum number of bytes for an InetAddress / Inet6Address
60      private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
61      private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
62  
63      private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
64      private final FileDescriptor eventfd;
65      private final CleanableDirectBuffer eventfdReadBufCleanable;
66      private final ByteBuffer eventfdReadBuf;
67      private final long eventfdReadBufAddress;
68      private final CleanableDirectBuffer timeoutMemoryCleanable;
69      private final ByteBuffer timeoutMemory;
70      private final long timeoutMemoryAddress;
71      private final IovArray iovArray;
72      private final MsgHdrMemoryArray msgHdrMemoryArray;
73      private long eventfdReadSubmitted;
74      private boolean eventFdClosing;
75      private volatile boolean shuttingDown;
76      private boolean closeCompleted;
77      private int nextRegistrationId = Integer.MIN_VALUE;
78  
79      // these two ids are used internally any so can't be used by nextRegistrationId().
80      private static final int EVENTFD_ID = Integer.MAX_VALUE;
81      private static final int RINGFD_ID = EVENTFD_ID - 1;
82      private static final int INVALID_ID = 0;
83  
84      private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
85  
86      private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
87      private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
88  
89      private final ThreadAwareExecutor executor;
90  
91      IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
92          // Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
93          IoUring.ensureAvailability();
94          this.executor = requireNonNull(executor, "executor");
95          requireNonNull(config, "config");
96          int setupFlags = Native.setupFlags(config.singleIssuer());
97  
98          //The default cq size is always twice the ringSize.
99          // It only makes sense when the user actually specifies the cq ring size.
100         int cqSize = 2 * config.getRingSize();
101         if (config.needSetupCqeSize()) {
102             if (!IoUring.isSetupCqeSizeSupported()) {
103                 throw new UnsupportedOperationException("IORING_SETUP_CQSIZE is not supported");
104             }
105             setupFlags |= Native.IORING_SETUP_CQSIZE;
106             cqSize = config.checkCqSize(config.getCqSize());
107         }
108         this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
109         if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
110             int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
111             int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
112             int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
113             if (result < 0) {
114                 // Close ringBuffer before throwing to ensure we release all memory on failure.
115                 ringBuffer.close();
116                 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
117             }
118         }
119 
120         registeredIoUringBufferRing = new IntObjectHashMap<>();
121         Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
122         if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
123             if (!IoUring.isRegisterBufferRingSupported()) {
124                 // Close ringBuffer before throwing to ensure we release all memory on failure.
125                 ringBuffer.close();
126                 throw new UnsupportedOperationException("IORING_REGISTER_PBUF_RING is not supported");
127             }
128             for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
129                 try {
130                     IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
131                     registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
132                 } catch (Errors.NativeIoException e) {
133                     for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
134                         bufferRing.close();
135                     }
136                     // Close ringBuffer before throwing to ensure we release all memory on failure.
137                     ringBuffer.close();
138                     throw new UncheckedIOException(e);
139                 }
140             }
141         }
142 
143         registrations = new IntObjectHashMap<>();
144         eventfd = Native.newBlockingEventFd();
145         eventfdReadBufCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
146         eventfdReadBuf = eventfdReadBufCleanable.buffer();
147         eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
148         timeoutMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(KERNEL_TIMESPEC_SIZE);
149         timeoutMemory = timeoutMemoryCleanable.buffer();
150         timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
151         iovArray = new IovArray(IoUring.NUM_ELEMENTS_IOVEC);
152         msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1024);
153     }
154 
155     @Override
156     public void initialize() {
157         ringBuffer.enable();
158         // Fill all buffer rings now.
159         for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
160             bufferRing.initialize();
161         }
162     }
163 
164     @Override
165     public int run(IoHandlerContext context) {
166         if (closeCompleted) {
167             if (context.shouldReportActiveIoTime()) {
168                 context.reportActiveIoTime(0);
169             }
170             return 0;
171         }
172         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
173         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
174         if (!completionQueue.hasCompletions() && context.canBlock()) {
175             if (eventfdReadSubmitted == 0) {
176                 submitEventFdRead();
177             }
178             long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
179             submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
180         } else {
181             // Even if we have some completions already pending we can still try to even fetch more.
182             submitAndClearNow(submissionQueue);
183         }
184 
185         int processed;
186         if (context.shouldReportActiveIoTime()) {
187             // Timer starts after the blocking wait, around the processing of completions.
188             long activeIoStartTimeNanos = System.nanoTime();
189             processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
190             long activeIoEndTimeNanos = System.nanoTime();
191             context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
192         } else {
193             processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
194         }
195         return processed;
196     }
197 
198     private int processCompletionsAndHandleOverflow(SubmissionQueue submissionQueue, CompletionQueue completionQueue,
199                                          CompletionCallback callback) {
200         int processed = 0;
201         // Bound the maximum number of times this will loop before we return and so execute some non IO stuff.
202         // 128 here is just some sort of bound and another number might be ok as well.
203         for (int i = 0; i < 128; i++) {
204             int p = completionQueue.process(callback);
205             if ((submissionQueue.flags() & Native.IORING_SQ_CQ_OVERFLOW) != 0) {
206                 logger.warn("CompletionQueue overflow detected, consider increasing size: {} ",
207                         completionQueue.ringEntries);
208                 submitAndClearNow(submissionQueue);
209             } else if (p == 0 &&
210                     // Check if there are any more submissions pending, if not break the loop.
211                     (submissionQueue.count() == 0 ||
212                     // Let's try to submit again and check if there are new completions to handle.
213                     // Only break the loop if there was nothing submitted and there are no new completions.
214                     (submitAndClearNow(submissionQueue) == 0 && !completionQueue.hasCompletions()))) {
215                 break;
216             }
217             processed += p;
218         }
219         return processed;
220     }
221 
222     private int submitAndClearNow(SubmissionQueue submissionQueue) {
223         int submitted = submissionQueue.submitAndGetNow();
224 
225         // Clear the iovArray as we can re-use it now as things are considered stable after submission:
226         // See https://man7.org/linux/man-pages/man3/io_uring_prep_sendmsg.3.html
227         iovArray.clear();
228         msgHdrMemoryArray.clear();
229         return submitted;
230     }
231 
232     private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
233             throws Errors.NativeIoException {
234         short bufferRingSize = bufferRingConfig.bufferRingSize();
235         short bufferGroupId = bufferRingConfig.bufferGroupId();
236         int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
237         long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
238         if (ioUringBufRingAddr < 0) {
239             throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
240         }
241         return new IoUringBufferRing(ringFd,
242                 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
243                 bufferRingSize, bufferRingConfig.batchSize(),
244                 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator(),
245                 bufferRingConfig.isBatchAllocation()
246         );
247     }
248 
249     IoUringBufferRing findBufferRing(short bgId) {
250         IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
251         if (cached != null) {
252             return cached;
253         }
254         throw new IllegalArgumentException(
255                 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
256         );
257     }
258 
259     private static void handleLoopException(Throwable throwable) {
260         logger.warn("Unexpected exception in the IO event loop.", throwable);
261 
262         // Prevent possible consecutive immediate failures that lead to
263         // excessive CPU consumption.
264         try {
265             Thread.sleep(100);
266         } catch (InterruptedException ignore) {
267             // ignore
268         }
269     }
270 
271     private boolean handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
272         try {
273             int id = UserData.decodeId(udata);
274             byte op = UserData.decodeOp(udata);
275             short data = UserData.decodeData(udata);
276 
277             if (logger.isTraceEnabled()) {
278                 logger.trace("completed(ring {}): {}(id={}, res={})",
279                         ringBuffer.fd(), Native.opToStr(op), data, res);
280             }
281             if (id == EVENTFD_ID) {
282                 handleEventFdRead();
283                 return true;
284             }
285             if (id == RINGFD_ID) {
286                 // Just return
287                 return true;
288             }
289             DefaultIoUringIoRegistration registration = registrations.get(id);
290             if (registration == null) {
291                 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
292                         Native.opToStr(op), id, res);
293                 return true;
294             }
295             registration.handle(res, flags, op, data, extraCqeData);
296             return true;
297         } catch (Error e) {
298             throw e;
299         } catch (Throwable throwable) {
300             handleLoopException(throwable);
301             return true;
302         }
303     }
304 
305     private void handleEventFdRead() {
306         eventfdReadSubmitted = 0;
307         if (!eventFdClosing) {
308             eventfdAsyncNotify.set(false);
309             submitEventFdRead();
310         }
311     }
312 
313     private void submitEventFdRead() {
314         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
315         long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
316 
317         eventfdReadSubmitted = submissionQueue.addEventFdRead(
318                 eventfd.intValue(), eventfdReadBufAddress, 0, 8, udata);
319     }
320 
321     private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
322                                          boolean linkTimeout, long timeoutNanoSeconds) {
323         if (timeoutNanoSeconds != -1) {
324             long udata = UserData.encode(RINGFD_ID,
325                     linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
326             // We use the same timespec pointer for all add*Timeout operations. This only works because we call
327             // submit directly after it. This ensures the submitted timeout is considered "stable" and so can be reused.
328             long seconds, nanoSeconds;
329             if (timeoutNanoSeconds == 0) {
330                 seconds = 0;
331                 nanoSeconds = 0;
332             } else {
333                 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
334                 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
335             }
336 
337             timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
338             timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
339             if (linkTimeout) {
340                 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
341             } else {
342                 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
343             }
344         }
345         int submitted = submissionQueue.submitAndGet();
346         // Clear the iovArray as we can re-use it now as things are considered stable after submission:
347         // See https://man7.org/linux/man-pages/man3/io_uring_prep_sendmsg.3.html
348         iovArray.clear();
349         msgHdrMemoryArray.clear();
350         return submitted;
351     }
352 
353     @Override
354     public void prepareToDestroy() {
355         shuttingDown = true;
356         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
357         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
358 
359         List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
360 
361         for (DefaultIoUringIoRegistration registration: copy) {
362             registration.close();
363         }
364 
365         // Write to the eventfd to ensure that if we submitted a read for the eventfd we will see the completion event.
366         Native.eventFdWrite(eventfd.intValue(), 1L);
367 
368         // Ensure all previously submitted IOs get to complete before tearing down everything.
369         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
370         submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
371 
372         // Submit everything and wait until we could drain i.
373         submissionQueue.submitAndGet();
374 
375         while (completionQueue.hasCompletions()) {
376             processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
377             if (submissionQueue.count() > 0) {
378                 submissionQueue.submitAndGetNow();
379             }
380         }
381     }
382 
383     @Override
384     public void destroy() {
385         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
386         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
387         drainEventFd();
388         if (submissionQueue.remaining() < 2) {
389             // We need to submit 2 linked operations. Since they are linked, we cannot allow a submit-call to
390             // separate them. We don't have enough room (< 2) in the queue, so we submit now to make more room.
391             submissionQueue.submit();
392         }
393         // Try to drain all the IO from the queue first...
394         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
395         // We need to also specify the Native.IOSQE_LINK flag for it to work as otherwise it is not correctly linked
396         // with the timeout.
397         // See:
398         // - https://man7.org/linux/man-pages/man2/io_uring_enter.2.html
399         // - https://git.kernel.dk/cgit/liburing/commit/?h=link-timeout&id=bc1bd5e97e2c758d6fd975bd35843b9b2c770c5a
400         submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
401         // ... but only wait for 200 milliseconds on this
402         submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
403         completionQueue.process(this::handle);
404         for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
405             ioUringBufferRing.close();
406         }
407         completeRingClose();
408     }
409 
410     // We need to prevent the race condition where a wakeup event is submitted to a file descriptor that has
411     // already been freed (and potentially reallocated by the OS). Because submitted events is gated on the
412     // `eventfdAsyncNotify` flag we can close the gate but may need to read any outstanding events that have
413     // (or will) be written.
414     private void drainEventFd() {
415         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
416         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
417         assert !eventFdClosing;
418         eventFdClosing = true;
419         boolean eventPending = eventfdAsyncNotify.getAndSet(true);
420         if (eventPending) {
421             // There is an event that has been or will be written by another thread, so we must wait for the event.
422             // Make sure we're actually listening for writes to the event fd.
423             while (eventfdReadSubmitted == 0) {
424                 submitEventFdRead();
425                 submissionQueue.submit();
426             }
427             // Drain the eventfd of the pending wakup.
428             class DrainFdEventCallback implements CompletionCallback {
429                 boolean eventFdDrained;
430 
431                 @Override
432                 public boolean handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
433                     if (UserData.decodeId(udata) == EVENTFD_ID) {
434                         eventFdDrained = true;
435                     }
436                     return IoUringIoHandler.this.handle(res, flags, udata, extraCqeData);
437                 }
438             }
439             final DrainFdEventCallback handler = new DrainFdEventCallback();
440             completionQueue.process(handler);
441             while (!handler.eventFdDrained) {
442                 submissionQueue.submitAndGet();
443                 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, handler);
444             }
445         }
446         // We've consumed any pending eventfd read and `eventfdAsyncNotify` should never
447         // transition back to false, thus we should never have any more events written.
448         // So, if we have a read event pending, we can cancel it.
449         if (eventfdReadSubmitted != 0) {
450             long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
451             submissionQueue.addCancel(eventfdReadSubmitted, udata);
452             eventfdReadSubmitted = 0;
453             submissionQueue.submit();
454         }
455     }
456 
457     private void completeRingClose() {
458         if (closeCompleted) {
459             // already done.
460             return;
461         }
462         closeCompleted = true;
463         ringBuffer.close();
464         try {
465             eventfd.close();
466         } catch (IOException e) {
467             logger.warn("Failed to close eventfd", e);
468         }
469         eventfdReadBufCleanable.clean();
470         timeoutMemoryCleanable.clean();
471         iovArray.release();
472         msgHdrMemoryArray.release();
473     }
474 
475     @Override
476     public IoRegistration register(IoHandle handle) throws Exception {
477         IoUringIoHandle ioHandle = cast(handle);
478         if (shuttingDown) {
479             throw new IllegalStateException("IoUringIoHandler is shutting down");
480         }
481         DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
482         for (;;) {
483             int id = nextRegistrationId();
484             DefaultIoUringIoRegistration old = registrations.put(id, registration);
485             if (old != null) {
486                 assert old.handle != registration.handle;
487                 registrations.put(id, old);
488             } else {
489                 registration.setId(id);
490                 break;
491             }
492         }
493 
494         return registration;
495     }
496 
497     private int nextRegistrationId() {
498         int id;
499         do {
500             id = nextRegistrationId++;
501         } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
502         return id;
503     }
504 
505     private final class DefaultIoUringIoRegistration implements IoRegistration {
506         private final AtomicBoolean canceled = new AtomicBoolean();
507         private final ThreadAwareExecutor executor;
508         private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
509         final IoUringIoHandle handle;
510 
511         private boolean removeLater;
512         private int outstandingCompletions;
513         private int id;
514 
515         DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
516             this.executor = executor;
517             this.handle = handle;
518         }
519 
520         void setId(int id) {
521             this.id = id;
522         }
523 
524         @Override
525         public long submit(IoOps ops) {
526             IoUringIoOps ioOps = (IoUringIoOps) ops;
527             if (!isValid()) {
528                 return INVALID_ID;
529             }
530             if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
531                 // Because we expect at least 1 completion per submission we can't support IOSQE_CQE_SKIP_SUCCESS
532                 // as it will only produce a completion on failure.
533                 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
534             }
535             long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
536             if (executor.isExecutorThread(Thread.currentThread())) {
537                 submit0(ioOps, udata);
538             } else {
539                 executor.execute(() -> submit0(ioOps, udata));
540             }
541             return udata;
542         }
543 
544         private void submit0(IoUringIoOps ioOps, long udata) {
545             ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
546                     ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
547                     ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
548             );
549             outstandingCompletions++;
550         }
551 
552         @SuppressWarnings("unchecked")
553         @Override
554         public <T> T attachment() {
555             return (T) IoUringIoHandler.this;
556         }
557 
558         @Override
559         public boolean isValid() {
560             return !canceled.get();
561         }
562 
563         @Override
564         public boolean cancel() {
565             if (!canceled.compareAndSet(false, true)) {
566                 // Already cancelled.
567                 return false;
568             }
569             if (executor.isExecutorThread(Thread.currentThread())) {
570                 tryRemove();
571             } else {
572                 executor.execute(this::tryRemove);
573             }
574             return true;
575         }
576 
577         private void tryRemove() {
578             if (outstandingCompletions > 0) {
579                 // We have some completions outstanding, we will remove the id <-> registration mapping
580                 // once these are done.
581                 removeLater = true;
582                 return;
583             }
584             remove();
585         }
586 
587         private void remove() {
588             DefaultIoUringIoRegistration old = registrations.remove(id);
589             assert old == this;
590         }
591 
592         void close() {
593             // Closing the handle will also cancel the registration.
594             // It's important that we not manually cancel as close() might need to submit some work to the ring.
595             assert executor.isExecutorThread(Thread.currentThread());
596             try {
597                 handle.close();
598             } catch (Exception e) {
599                 logger.debug("Exception during closing " + handle, e);
600             }
601         }
602 
603         void handle(int res, int flags, byte op, short data, ByteBuffer extraCqeData) {
604             event.update(res, flags, op, data, extraCqeData);
605             handle.handle(this, event);
606             // Only decrement outstandingCompletions if IORING_CQE_F_MORE is not set as otherwise we know that we will
607             // receive more completions for the intial request.
608             if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
609                 // No more outstanding completions, remove the fd <-> registration mapping now.
610                 removeLater = false;
611                 remove();
612             }
613         }
614     }
615 
616     private static IoUringIoHandle cast(IoHandle handle) {
617         if (handle instanceof IoUringIoHandle) {
618             return (IoUringIoHandle) handle;
619         }
620         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
621     }
622 
623     @Override
624     public void wakeup() {
625         if (!executor.isExecutorThread(Thread.currentThread()) &&
626                 !eventfdAsyncNotify.getAndSet(true)) {
627             // write to the eventfd which will then trigger an eventfd read completion.
628             Native.eventFdWrite(eventfd.intValue(), 1L);
629         }
630     }
631 
632     @Override
633     public boolean isCompatible(Class<? extends IoHandle> handleType) {
634         return IoUringIoHandle.class.isAssignableFrom(handleType);
635     }
636 
637     IovArray iovArray() {
638         if (iovArray.isFull()) {
639             // Submit so we can reuse the iovArray.
640             submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
641         }
642         assert iovArray.count() == 0;
643         return iovArray;
644     }
645 
646     MsgHdrMemoryArray msgHdrMemoryArray() {
647         if (msgHdrMemoryArray.isFull()) {
648             // Submit so we can reuse the msgHdrArray.
649             submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
650         }
651         return msgHdrMemoryArray;
652     }
653 
654     /**
655      * {@code byte[]} that can be used as temporary storage to encode the ipv4 address
656      */
657     byte[] inet4AddressArray() {
658         return inet4AddressArray;
659     }
660 
661     /**
662      * {@code byte[]} that can be used as temporary storage to encode the ipv6 address
663      */
664     byte[] inet6AddressArray() {
665         return inet6AddressArray;
666     }
667 
668     /**
669      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
670      *
671      * @return factory
672      */
673     public static IoHandlerFactory newFactory() {
674         return newFactory(new IoUringIoHandlerConfig());
675     }
676 
677     /**
678      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
679      * Each {@link IoUringIoHandler} will use a ring of size {@code ringSize}.
680      *
681      * @param  ringSize     the size of the ring.
682      * @return              factory
683      */
684     public static IoHandlerFactory newFactory(int ringSize) {
685         IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
686         configuration.setRingSize(ringSize);
687         return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
688     }
689 
690     /**
691      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
692      * Each {@link IoUringIoHandler} will use same option
693      * @param config the io_uring configuration
694      * @return factory
695      */
696     public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
697         IoUring.ensureAvailability();
698         ObjectUtil.checkNotNull(config, "config");
699         return new IoHandlerFactory() {
700             @Override
701             public IoHandler newHandler(ThreadAwareExecutor eventLoop) {
702                 return new IoUringIoHandler(eventLoop, config);
703             }
704 
705             @Override
706             public boolean isChangingThreadSupported() {
707                 return !config.singleIssuer();
708             }
709         };
710     }
711 }