View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.Unpooled;
19  import io.netty.channel.IoHandlerContext;
20  import io.netty.channel.IoHandle;
21  import io.netty.channel.IoHandler;
22  import io.netty.channel.IoHandlerFactory;
23  import io.netty.channel.IoOps;
24  import io.netty.channel.IoRegistration;
25  import io.netty.channel.unix.Buffer;
26  import io.netty.channel.unix.Errors;
27  import io.netty.channel.unix.FileDescriptor;
28  import io.netty.channel.unix.IovArray;
29  import io.netty.util.collection.IntObjectHashMap;
30  import io.netty.util.collection.IntObjectMap;
31  import io.netty.util.concurrent.ThreadAwareExecutor;
32  import io.netty.util.internal.ObjectUtil;
33  import io.netty.util.internal.StringUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.io.IOException;
38  import java.io.UncheckedIOException;
39  import java.nio.ByteBuffer;
40  import java.util.ArrayList;
41  import java.util.Collection;
42  import java.util.List;
43  import java.util.concurrent.TimeUnit;
44  import java.util.concurrent.atomic.AtomicBoolean;
45  
46  import static java.lang.Math.max;
47  import static java.lang.Math.min;
48  import static java.util.Objects.requireNonNull;
49  
50  /**
51   * {@link IoHandler} which is implemented in terms of the Linux-specific {@code io_uring} API.
52   */
53  public final class IoUringIoHandler implements IoHandler {
54      private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
55  
56      private final RingBuffer ringBuffer;
57      private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
58      private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
59      // The maximum number of bytes for an InetAddress / Inet6Address
60      private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
61      private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
62  
63      private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
64      private final FileDescriptor eventfd;
65      private final ByteBuffer eventfdReadBuf;
66      private final long eventfdReadBufAddress;
67      private final ByteBuffer timeoutMemory;
68      private final long timeoutMemoryAddress;
69      private final IovArray iovArray;
70      private long eventfdReadSubmitted;
71      private boolean eventFdClosing;
72      private volatile boolean shuttingDown;
73      private boolean closeCompleted;
74      private int nextRegistrationId = Integer.MIN_VALUE;
75  
76      // these two ids are used internally any so can't be used by nextRegistrationId().
77      private static final int EVENTFD_ID = Integer.MAX_VALUE;
78      private static final int RINGFD_ID = EVENTFD_ID - 1;
79      private static final int INVALID_ID = 0;
80  
81      private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
82  
83      private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
84      private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
85  
86      private final CompletionBuffer completionBuffer;
87      private final ThreadAwareExecutor executor;
88  
89      IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
90          // Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
91          IoUring.ensureAvailability();
92          this.executor = requireNonNull(executor, "executor");
93          requireNonNull(config, "config");
94          int setupFlags = Native.setupFlags();
95  
96          //The default cq size is always twice the ringSize.
97          // It only makes sense when the user actually specifies the cq ring size.
98          int cqSize = 2 * config.getRingSize();
99          if (config.needSetupCqeSize()) {
100             if (!IoUring.isSetupCqeSizeSupported()) {
101                 throw new UnsupportedOperationException("IORING_SETUP_CQSIZE is not supported");
102             }
103             setupFlags |= Native.IORING_SETUP_CQSIZE;
104             cqSize = config.checkCqSize(config.getCqSize());
105         }
106         this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
107         if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
108             int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
109             int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
110             int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
111             if (result < 0) {
112                 // Close ringBuffer before throwing to ensure we release all memory on failure.
113                 ringBuffer.close();
114                 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
115             }
116         }
117 
118         registeredIoUringBufferRing = new IntObjectHashMap<>();
119         Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
120         if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
121             if (!IoUring.isRegisterBufferRingSupported()) {
122                 // Close ringBuffer before throwing to ensure we release all memory on failure.
123                 ringBuffer.close();
124                 throw new UnsupportedOperationException("IORING_REGISTER_PBUF_RING is not supported");
125             }
126             for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
127                 try {
128                     IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
129                     registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
130                 } catch (Errors.NativeIoException e) {
131                     for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
132                         bufferRing.close();
133                     }
134                     // Close ringBuffer before throwing to ensure we release all memory on failure.
135                     ringBuffer.close();
136                     throw new UncheckedIOException(e);
137                 }
138             }
139         }
140 
141         registrations = new IntObjectHashMap<>();
142         eventfd = Native.newBlockingEventFd();
143         eventfdReadBuf = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
144         eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
145         this.timeoutMemory = Buffer.allocateDirectWithNativeOrder(KERNEL_TIMESPEC_SIZE);
146         this.timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
147         // We buffer a maximum of 2 * CompletionQueue.ringCapacity completions before we drain them in batches.
148         // Also as we never submit an udata which is 0L we use this as the tombstone marker.
149         completionBuffer = new CompletionBuffer(ringBuffer.ioUringCompletionQueue().ringCapacity * 2, 0);
150 
151         iovArray = new IovArray(Unpooled.wrappedBuffer(
152                 Buffer.allocateDirectWithNativeOrder(IoUring.NUM_ELEMENTS_IOVEC * IovArray.IOV_SIZE))
153                 .setIndex(0, 0));
154     }
155 
156     @Override
157     public void initialize() {
158         ringBuffer.enable();
159         // Fill all buffer rings now.
160         for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
161             bufferRing.initialize();
162         }
163     }
164 
165     @Override
166     public int run(IoHandlerContext context) {
167         if (closeCompleted) {
168             return 0;
169         }
170         int processedPerRun = 0;
171         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
172         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
173         if (!completionQueue.hasCompletions() && context.canBlock()) {
174             if (eventfdReadSubmitted == 0) {
175                 submitEventFdRead();
176             }
177             long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
178             submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
179         } else {
180             submitAndClear(submissionQueue);
181         }
182         for (;;) {
183             // we might call submitAndRunNow() while processing stuff in the completionArray we need to
184             // add the processed completions to processedPerRun.
185             int processed = drainAndProcessAll(completionQueue, this::handle);
186             processedPerRun += processed;
187 
188             // Let's submit again.
189             // If we were not able to submit anything and there was nothing left in the completionBuffer we will
190             // break out of the loop and return to the caller.
191             if (submitAndClear(submissionQueue) == 0 && processed == 0) {
192                 break;
193             }
194         }
195 
196         return processedPerRun;
197     }
198 
199     void submitAndRunNow(long udata) {
200         if (closeCompleted) {
201             return;
202         }
203         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
204         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
205         if (submitAndClear(submissionQueue) > 0) {
206             completionBuffer.drain(completionQueue);
207             completionBuffer.processOneNow(this::handle, udata);
208         }
209     }
210 
211     private int submitAndClear(SubmissionQueue submissionQueue) {
212         int submitted = submissionQueue.submit();
213 
214         // Clear the iovArray as we can re-use it now as things are considered stable after submission:
215         // See https://man7.org/linux/man-pages/man3/io_uring_prep_sendmsg.3.html
216         iovArray.clear();
217         return submitted;
218     }
219 
220     private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
221             throws Errors.NativeIoException {
222         short bufferRingSize = bufferRingConfig.bufferRingSize();
223         short bufferGroupId = bufferRingConfig.bufferGroupId();
224         int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
225         long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
226         if (ioUringBufRingAddr < 0) {
227             throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
228         }
229         return new IoUringBufferRing(ringFd,
230                 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
231                 bufferRingSize, bufferRingConfig.batchSize(), bufferRingConfig.maxUnreleasedBuffers(),
232                 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator()
233         );
234     }
235 
236     IoUringBufferRing findBufferRing(short bgId) {
237         IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
238         if (cached != null) {
239             return cached;
240         }
241         throw new IllegalArgumentException(
242                 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
243         );
244     }
245 
246     private int drainAndProcessAll(CompletionQueue completionQueue, CompletionCallback callback) {
247         int processed = 0;
248         for (;;) {
249             boolean drainedAll = completionBuffer.drain(completionQueue);
250             processed += completionBuffer.processNow(callback);
251             if (drainedAll) {
252                 break;
253             }
254         }
255         return processed;
256     }
257 
258     private static void handleLoopException(Throwable throwable) {
259         logger.warn("Unexpected exception in the IO event loop.", throwable);
260 
261         // Prevent possible consecutive immediate failures that lead to
262         // excessive CPU consumption.
263         try {
264             Thread.sleep(100);
265         } catch (InterruptedException ignore) {
266             // ignore
267         }
268     }
269 
270     private boolean handle(int res, int flags, long udata) {
271         try {
272             int id = UserData.decodeId(udata);
273             byte op = UserData.decodeOp(udata);
274             short data = UserData.decodeData(udata);
275 
276             if (logger.isTraceEnabled()) {
277                 logger.trace("completed(ring {}): {}(id={}, res={})",
278                         ringBuffer.fd(), Native.opToStr(op), data, res);
279             }
280             if (id == EVENTFD_ID) {
281                 handleEventFdRead();
282                 return true;
283             }
284             if (id == RINGFD_ID) {
285                 // Just return
286                 return true;
287             }
288             DefaultIoUringIoRegistration registration = registrations.get(id);
289             if (registration == null) {
290                 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
291                         Native.opToStr(op), id, res);
292                 return true;
293             }
294             registration.handle(res, flags, op, data);
295             return true;
296         } catch (Error e) {
297             throw e;
298         } catch (Throwable throwable) {
299             handleLoopException(throwable);
300             return true;
301         }
302     }
303 
304     private void handleEventFdRead() {
305         eventfdReadSubmitted = 0;
306         if (!eventFdClosing) {
307             eventfdAsyncNotify.set(false);
308             submitEventFdRead();
309         }
310     }
311 
312     private void submitEventFdRead() {
313         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
314         long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
315 
316         eventfdReadSubmitted = submissionQueue.addEventFdRead(
317                 eventfd.intValue(), eventfdReadBufAddress, 0, 8, udata);
318     }
319 
320     private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
321                                          boolean linkTimeout, long timeoutNanoSeconds) {
322         if (timeoutNanoSeconds != -1) {
323             long udata = UserData.encode(RINGFD_ID,
324                     linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
325             // We use the same timespec pointer for all add*Timeout operations. This only works because we call
326             // submit directly after it. This ensures the submitted timeout is considered "stable" and so can be reused.
327             long seconds, nanoSeconds;
328             if (timeoutNanoSeconds == 0) {
329                 seconds = 0;
330                 nanoSeconds = 0;
331             } else {
332                 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
333                 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
334             }
335 
336             timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
337             timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
338             if (linkTimeout) {
339                 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
340             } else {
341                 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
342             }
343         }
344         int submitted = submissionQueue.submitAndWait();
345         // Clear the iovArray as we can re-use it now as things are considered stable after submission:
346         // See https://man7.org/linux/man-pages/man3/io_uring_prep_sendmsg.3.html
347         iovArray.clear();
348         return submitted;
349     }
350 
351     @Override
352     public void prepareToDestroy() {
353         shuttingDown = true;
354         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
355         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
356 
357         List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
358 
359         for (DefaultIoUringIoRegistration registration: copy) {
360             registration.close();
361         }
362 
363         // Write to the eventfd to ensure that if we submitted a read for the eventfd we will see the completion event.
364         Native.eventFdWrite(eventfd.intValue(), 1L);
365 
366         // Ensure all previously submitted IOs get to complete before tearing down everything.
367         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
368         submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
369 
370         // Submit everything and wait until we could drain i.
371         submissionQueue.submitAndWait();
372         while (completionQueue.hasCompletions()) {
373             completionQueue.process(this::handle);
374 
375             if (submissionQueue.count() > 0) {
376                 submissionQueue.submit();
377             }
378         }
379     }
380 
381     @Override
382     public void destroy() {
383         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
384         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
385         drainEventFd();
386         if (submissionQueue.remaining() < 2) {
387             // We need to submit 2 linked operations. Since they are linked, we cannot allow a submit-call to
388             // separate them. We don't have enough room (< 2) in the queue, so we submit now to make more room.
389             submissionQueue.submit();
390         }
391         // Try to drain all the IO from the queue first...
392         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
393         // We need to also specify the Native.IOSQE_LINK flag for it to work as otherwise it is not correctly linked
394         // with the timeout.
395         // See:
396         // - https://man7.org/linux/man-pages/man2/io_uring_enter.2.html
397         // - https://git.kernel.dk/cgit/liburing/commit/?h=link-timeout&id=bc1bd5e97e2c758d6fd975bd35843b9b2c770c5a
398         submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
399         // ... but only wait for 200 milliseconds on this
400         submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
401         completionQueue.process(this::handle);
402         for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
403             ioUringBufferRing.close();
404         }
405         completeRingClose();
406     }
407 
408     // We need to prevent the race condition where a wakeup event is submitted to a file descriptor that has
409     // already been freed (and potentially reallocated by the OS). Because submitted events is gated on the
410     // `eventfdAsyncNotify` flag we can close the gate but may need to read any outstanding events that have
411     // (or will) be written.
412     private void drainEventFd() {
413         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
414         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
415         assert !eventFdClosing;
416         eventFdClosing = true;
417         boolean eventPending = eventfdAsyncNotify.getAndSet(true);
418         if (eventPending) {
419             // There is an event that has been or will be written by another thread, so we must wait for the event.
420             // Make sure we're actually listening for writes to the event fd.
421             while (eventfdReadSubmitted == 0) {
422                 submitEventFdRead();
423                 submissionQueue.submit();
424             }
425             // Drain the eventfd of the pending wakup.
426             class DrainFdEventCallback implements CompletionCallback {
427                 boolean eventFdDrained;
428 
429                 @Override
430                 public boolean handle(int res, int flags, long udata) {
431                     if (UserData.decodeId(udata) == EVENTFD_ID) {
432                         eventFdDrained = true;
433                     }
434                     return IoUringIoHandler.this.handle(res, flags, udata);
435                 }
436             }
437             final DrainFdEventCallback handler = new DrainFdEventCallback();
438             drainAndProcessAll(completionQueue, handler);
439             completionQueue.process(handler);
440             while (!handler.eventFdDrained) {
441                 submissionQueue.submitAndWait();
442                 drainAndProcessAll(completionQueue, handler);
443             }
444         }
445         // We've consumed any pending eventfd read and `eventfdAsyncNotify` should never
446         // transition back to false, thus we should never have any more events written.
447         // So, if we have a read event pending, we can cancel it.
448         if (eventfdReadSubmitted != 0) {
449             long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
450             submissionQueue.addCancel(eventfdReadSubmitted, udata);
451             eventfdReadSubmitted = 0;
452             submissionQueue.submit();
453         }
454     }
455 
456     private void completeRingClose() {
457         if (closeCompleted) {
458             // already done.
459             return;
460         }
461         closeCompleted = true;
462         ringBuffer.close();
463         try {
464             eventfd.close();
465         } catch (IOException e) {
466             logger.warn("Failed to close eventfd", e);
467         }
468         Buffer.free(eventfdReadBuf);
469         Buffer.free(timeoutMemory);
470         iovArray.release();
471     }
472 
473     @Override
474     public IoRegistration register(IoHandle handle) throws Exception {
475         IoUringIoHandle ioHandle = cast(handle);
476         if (shuttingDown) {
477             throw new IllegalStateException("IoUringIoHandler is shutting down");
478         }
479         DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
480         for (;;) {
481             int id = nextRegistrationId();
482             DefaultIoUringIoRegistration old = registrations.put(id, registration);
483             if (old != null) {
484                 assert old.handle != registration.handle;
485                 registrations.put(id, old);
486             } else {
487                 registration.setId(id);
488                 break;
489             }
490         }
491 
492         return registration;
493     }
494 
495     private int nextRegistrationId() {
496         int id;
497         do {
498             id = nextRegistrationId++;
499         } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
500         return id;
501     }
502 
503     private final class DefaultIoUringIoRegistration implements IoRegistration {
504         private final AtomicBoolean canceled = new AtomicBoolean();
505         private final ThreadAwareExecutor executor;
506         private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
507         final IoUringIoHandle handle;
508 
509         private boolean removeLater;
510         private int outstandingCompletions;
511         private int id;
512 
513         DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
514             this.executor = executor;
515             this.handle = handle;
516         }
517 
518         void setId(int id) {
519             this.id = id;
520         }
521 
522         @Override
523         public long submit(IoOps ops) {
524             IoUringIoOps ioOps = (IoUringIoOps) ops;
525             if (!isValid()) {
526                 return INVALID_ID;
527             }
528             if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
529                 // Because we expect at least 1 completion per submission we can't support IOSQE_CQE_SKIP_SUCCESS
530                 // as it will only produce a completion on failure.
531                 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
532             }
533             long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
534             if (executor.isExecutorThread(Thread.currentThread())) {
535                 submit0(ioOps, udata);
536             } else {
537                 executor.execute(() -> submit0(ioOps, udata));
538             }
539             return udata;
540         }
541 
542         private void submit0(IoUringIoOps ioOps, long udata) {
543             ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
544                     ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
545                     ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
546             );
547             outstandingCompletions++;
548         }
549 
550         @SuppressWarnings("unchecked")
551         @Override
552         public <T> T attachment() {
553             return (T) IoUringIoHandler.this;
554         }
555 
556         @Override
557         public boolean isValid() {
558             return !canceled.get();
559         }
560 
561         @Override
562         public boolean cancel() {
563             if (!canceled.compareAndSet(false, true)) {
564                 // Already cancelled.
565                 return false;
566             }
567             if (executor.isExecutorThread(Thread.currentThread())) {
568                 tryRemove();
569             } else {
570                 executor.execute(this::tryRemove);
571             }
572             return true;
573         }
574 
575         private void tryRemove() {
576             if (outstandingCompletions > 0) {
577                 // We have some completions outstanding, we will remove the id <-> registration mapping
578                 // once these are done.
579                 removeLater = true;
580                 return;
581             }
582             remove();
583         }
584 
585         private void remove() {
586             DefaultIoUringIoRegistration old = registrations.remove(id);
587             assert old == this;
588         }
589 
590         void close() {
591             // Closing the handle will also cancel the registration.
592             // It's important that we not manually cancel as close() might need to submit some work to the ring.
593             assert executor.isExecutorThread(Thread.currentThread());
594             try {
595                 handle.close();
596             } catch (Exception e) {
597                 logger.debug("Exception during closing " + handle, e);
598             }
599         }
600 
601         void handle(int res, int flags, byte op, short data) {
602             event.update(res, flags, op, data);
603             handle.handle(this, event);
604             // Only decrement outstandingCompletions if IORING_CQE_F_MORE is not set as otherwise we know that we will
605             // receive more completions for the intial request.
606             if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
607                 // No more outstanding completions, remove the fd <-> registration mapping now.
608                 removeLater = false;
609                 remove();
610             }
611         }
612     }
613 
614     private static IoUringIoHandle cast(IoHandle handle) {
615         if (handle instanceof IoUringIoHandle) {
616             return (IoUringIoHandle) handle;
617         }
618         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
619     }
620 
621     @Override
622     public void wakeup() {
623         if (!executor.isExecutorThread(Thread.currentThread()) &&
624                 !eventfdAsyncNotify.getAndSet(true)) {
625             // write to the eventfd which will then trigger an eventfd read completion.
626             Native.eventFdWrite(eventfd.intValue(), 1L);
627         }
628     }
629 
630     @Override
631     public boolean isCompatible(Class<? extends IoHandle> handleType) {
632         return IoUringIoHandle.class.isAssignableFrom(handleType);
633     }
634 
635     IovArray iovArray() {
636         if (iovArray.isFull()) {
637             // Submit so we can reuse the iovArray.
638             submitAndClear(ringBuffer.ioUringSubmissionQueue());
639         }
640         assert iovArray.count() == 0;
641         return iovArray;
642     }
643 
644     /**
645      * {@code byte[]} that can be used as temporary storage to encode the ipv4 address
646      */
647     byte[] inet4AddressArray() {
648         return inet4AddressArray;
649     }
650 
651     /**
652      * {@code byte[]} that can be used as temporary storage to encode the ipv6 address
653      */
654     byte[] inet6AddressArray() {
655         return inet6AddressArray;
656     }
657 
658     /**
659      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
660      *
661      * @return factory
662      */
663     public static IoHandlerFactory newFactory() {
664         return newFactory(new IoUringIoHandlerConfig());
665     }
666 
667     /**
668      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
669      * Each {@link IoUringIoHandler} will use a ring of size {@code ringSize}.
670      *
671      * @param  ringSize     the size of the ring.
672      * @return              factory
673      */
674     public static IoHandlerFactory newFactory(int ringSize) {
675         IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
676         configuration.setRingSize(ringSize);
677         return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
678     }
679 
680     /**
681      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
682      * Each {@link IoUringIoHandler} will use same option
683      * @param config the io_uring configuration
684      * @return factory
685      */
686     public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
687         IoUring.ensureAvailability();
688         ObjectUtil.checkNotNull(config, "config");
689         return eventLoop -> new IoUringIoHandler(eventLoop, config);
690     }
691 }