View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.channel.IoHandlerContext;
19  import io.netty.channel.IoHandle;
20  import io.netty.channel.IoHandler;
21  import io.netty.channel.IoHandlerFactory;
22  import io.netty.channel.IoOps;
23  import io.netty.channel.IoRegistration;
24  import io.netty.channel.unix.Buffer;
25  import io.netty.channel.unix.Errors;
26  import io.netty.channel.unix.FileDescriptor;
27  import io.netty.channel.unix.IovArray;
28  import io.netty.util.collection.IntObjectHashMap;
29  import io.netty.util.collection.IntObjectMap;
30  import io.netty.util.concurrent.ThreadAwareExecutor;
31  import io.netty.util.internal.CleanableDirectBuffer;
32  import io.netty.util.internal.ObjectUtil;
33  import io.netty.util.internal.StringUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.io.IOException;
38  import java.io.UncheckedIOException;
39  import java.nio.ByteBuffer;
40  import java.util.ArrayList;
41  import java.util.Collection;
42  import java.util.List;
43  import java.util.concurrent.TimeUnit;
44  import java.util.concurrent.atomic.AtomicBoolean;
45  
46  import static java.lang.Math.max;
47  import static java.lang.Math.min;
48  import static java.util.Objects.requireNonNull;
49  
50  /**
51   * {@link IoHandler} which is implemented in terms of the Linux-specific {@code io_uring} API.
52   */
53  public final class IoUringIoHandler implements IoHandler {
54      private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
55  
56      private final RingBuffer ringBuffer;
57      private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
58      private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
59      // The maximum number of bytes for an InetAddress / Inet6Address
60      private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
61      private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
62  
63      private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
64      private final FileDescriptor eventfd;
65      private final CleanableDirectBuffer eventfdReadBufCleanable;
66      private final ByteBuffer eventfdReadBuf;
67      private final long eventfdReadBufAddress;
68      private final CleanableDirectBuffer timeoutMemoryCleanable;
69      private final ByteBuffer timeoutMemory;
70      private final long timeoutMemoryAddress;
71      private final IovArray iovArray;
72      private final MsgHdrMemoryArray msgHdrMemoryArray;
73      private long eventfdReadSubmitted;
74      private boolean eventFdClosing;
75      private volatile boolean shuttingDown;
76      private boolean closeCompleted;
77      private int nextRegistrationId = Integer.MIN_VALUE;
78  
79      // these two ids are used internally any so can't be used by nextRegistrationId().
80      private static final int EVENTFD_ID = Integer.MAX_VALUE;
81      private static final int RINGFD_ID = EVENTFD_ID - 1;
82      private static final int INVALID_ID = 0;
83  
84      private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
85  
86      private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
87      private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
88  
89      private final ThreadAwareExecutor executor;
90  
91      IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
92          // Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
93          IoUring.ensureAvailability();
94          this.executor = requireNonNull(executor, "executor");
95          requireNonNull(config, "config");
96          int setupFlags = Native.setupFlags();
97  
98          //The default cq size is always twice the ringSize.
99          // It only makes sense when the user actually specifies the cq ring size.
100         int cqSize = 2 * config.getRingSize();
101         if (config.needSetupCqeSize()) {
102             if (!IoUring.isSetupCqeSizeSupported()) {
103                 throw new UnsupportedOperationException("IORING_SETUP_CQSIZE is not supported");
104             }
105             setupFlags |= Native.IORING_SETUP_CQSIZE;
106             cqSize = config.checkCqSize(config.getCqSize());
107         }
108         this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
109         if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
110             int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
111             int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
112             int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
113             if (result < 0) {
114                 // Close ringBuffer before throwing to ensure we release all memory on failure.
115                 ringBuffer.close();
116                 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
117             }
118         }
119 
120         registeredIoUringBufferRing = new IntObjectHashMap<>();
121         Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
122         if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
123             if (!IoUring.isRegisterBufferRingSupported()) {
124                 // Close ringBuffer before throwing to ensure we release all memory on failure.
125                 ringBuffer.close();
126                 throw new UnsupportedOperationException("IORING_REGISTER_PBUF_RING is not supported");
127             }
128             for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
129                 try {
130                     IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
131                     registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
132                 } catch (Errors.NativeIoException e) {
133                     for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
134                         bufferRing.close();
135                     }
136                     // Close ringBuffer before throwing to ensure we release all memory on failure.
137                     ringBuffer.close();
138                     throw new UncheckedIOException(e);
139                 }
140             }
141         }
142 
143         registrations = new IntObjectHashMap<>();
144         eventfd = Native.newBlockingEventFd();
145         eventfdReadBufCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
146         eventfdReadBuf = eventfdReadBufCleanable.buffer();
147         eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
148         timeoutMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(KERNEL_TIMESPEC_SIZE);
149         timeoutMemory = timeoutMemoryCleanable.buffer();
150         timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
151         iovArray = new IovArray(IoUring.NUM_ELEMENTS_IOVEC);
152         msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1024);
153     }
154 
155     @Override
156     public void initialize() {
157         ringBuffer.enable();
158         // Fill all buffer rings now.
159         for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
160             bufferRing.initialize();
161         }
162     }
163 
164     @Override
165     public int run(IoHandlerContext context) {
166         if (closeCompleted) {
167             return 0;
168         }
169         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
170         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
171         if (!completionQueue.hasCompletions() && context.canBlock()) {
172             if (eventfdReadSubmitted == 0) {
173                 submitEventFdRead();
174             }
175             long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
176             submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
177         } else {
178             // Even if we have some completions already pending we can still try to even fetch more.
179             submitAndClearNow(submissionQueue);
180         }
181         return processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
182     }
183 
184     private int processCompletionsAndHandleOverflow(SubmissionQueue submissionQueue, CompletionQueue completionQueue,
185                                          CompletionCallback callback) {
186         int processed = 0;
187         // Bound the maximum number of times this will loop before we return and so execute some non IO stuff.
188         // 128 here is just some sort of bound and another number might be ok as well.
189         for (int i = 0; i < 128; i++) {
190             int p = completionQueue.process(callback);
191             if ((submissionQueue.flags() & Native.IORING_SQ_CQ_OVERFLOW) != 0) {
192                 logger.warn("CompletionQueue overflow detected, consider increasing size: {} ",
193                         completionQueue.ringEntries);
194                 submitAndClearNow(submissionQueue);
195             } else if (p == 0 &&
196                     // Let's try to submit again and check if there are new completions to handle.
197                     // Only break the loop if there was nothing submitted and there are no new completions.
198                     submitAndClearNow(submissionQueue) == 0 && !completionQueue.hasCompletions()) {
199                 break;
200             }
201 
202             processed += p;
203         }
204         return processed;
205     }
206 
207     private int submitAndClearNow(SubmissionQueue submissionQueue) {
208         int submitted = submissionQueue.submitAndGetNow();
209 
210         // Clear the iovArray as we can re-use it now as things are considered stable after submission:
211         // See https://man7.org/linux/man-pages/man3/io_uring_prep_sendmsg.3.html
212         iovArray.clear();
213         msgHdrMemoryArray.clear();
214         return submitted;
215     }
216 
217     private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
218             throws Errors.NativeIoException {
219         short bufferRingSize = bufferRingConfig.bufferRingSize();
220         short bufferGroupId = bufferRingConfig.bufferGroupId();
221         int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
222         long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
223         if (ioUringBufRingAddr < 0) {
224             throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
225         }
226         return new IoUringBufferRing(ringFd,
227                 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
228                 bufferRingSize, bufferRingConfig.batchSize(),
229                 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator(),
230                 bufferRingConfig.isBatchAllocation()
231         );
232     }
233 
234     IoUringBufferRing findBufferRing(short bgId) {
235         IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
236         if (cached != null) {
237             return cached;
238         }
239         throw new IllegalArgumentException(
240                 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
241         );
242     }
243 
244     private static void handleLoopException(Throwable throwable) {
245         logger.warn("Unexpected exception in the IO event loop.", throwable);
246 
247         // Prevent possible consecutive immediate failures that lead to
248         // excessive CPU consumption.
249         try {
250             Thread.sleep(100);
251         } catch (InterruptedException ignore) {
252             // ignore
253         }
254     }
255 
256     private boolean handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
257         try {
258             int id = UserData.decodeId(udata);
259             byte op = UserData.decodeOp(udata);
260             short data = UserData.decodeData(udata);
261 
262             if (logger.isTraceEnabled()) {
263                 logger.trace("completed(ring {}): {}(id={}, res={})",
264                         ringBuffer.fd(), Native.opToStr(op), data, res);
265             }
266             if (id == EVENTFD_ID) {
267                 handleEventFdRead();
268                 return true;
269             }
270             if (id == RINGFD_ID) {
271                 // Just return
272                 return true;
273             }
274             DefaultIoUringIoRegistration registration = registrations.get(id);
275             if (registration == null) {
276                 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
277                         Native.opToStr(op), id, res);
278                 return true;
279             }
280             registration.handle(res, flags, op, data, extraCqeData);
281             return true;
282         } catch (Error e) {
283             throw e;
284         } catch (Throwable throwable) {
285             handleLoopException(throwable);
286             return true;
287         }
288     }
289 
290     private void handleEventFdRead() {
291         eventfdReadSubmitted = 0;
292         if (!eventFdClosing) {
293             eventfdAsyncNotify.set(false);
294             submitEventFdRead();
295         }
296     }
297 
298     private void submitEventFdRead() {
299         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
300         long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
301 
302         eventfdReadSubmitted = submissionQueue.addEventFdRead(
303                 eventfd.intValue(), eventfdReadBufAddress, 0, 8, udata);
304     }
305 
306     private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
307                                          boolean linkTimeout, long timeoutNanoSeconds) {
308         if (timeoutNanoSeconds != -1) {
309             long udata = UserData.encode(RINGFD_ID,
310                     linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
311             // We use the same timespec pointer for all add*Timeout operations. This only works because we call
312             // submit directly after it. This ensures the submitted timeout is considered "stable" and so can be reused.
313             long seconds, nanoSeconds;
314             if (timeoutNanoSeconds == 0) {
315                 seconds = 0;
316                 nanoSeconds = 0;
317             } else {
318                 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
319                 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
320             }
321 
322             timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
323             timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
324             if (linkTimeout) {
325                 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
326             } else {
327                 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
328             }
329         }
330         int submitted = submissionQueue.submitAndGet();
331         // Clear the iovArray as we can re-use it now as things are considered stable after submission:
332         // See https://man7.org/linux/man-pages/man3/io_uring_prep_sendmsg.3.html
333         iovArray.clear();
334         msgHdrMemoryArray.clear();
335         return submitted;
336     }
337 
338     @Override
339     public void prepareToDestroy() {
340         shuttingDown = true;
341         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
342         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
343 
344         List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
345 
346         for (DefaultIoUringIoRegistration registration: copy) {
347             registration.close();
348         }
349 
350         // Write to the eventfd to ensure that if we submitted a read for the eventfd we will see the completion event.
351         Native.eventFdWrite(eventfd.intValue(), 1L);
352 
353         // Ensure all previously submitted IOs get to complete before tearing down everything.
354         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
355         submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
356 
357         // Submit everything and wait until we could drain i.
358         submissionQueue.submitAndGet();
359 
360         while (completionQueue.hasCompletions()) {
361             processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
362             if (submissionQueue.count() > 0) {
363                 submissionQueue.submitAndGetNow();
364             }
365         }
366     }
367 
368     @Override
369     public void destroy() {
370         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
371         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
372         drainEventFd();
373         if (submissionQueue.remaining() < 2) {
374             // We need to submit 2 linked operations. Since they are linked, we cannot allow a submit-call to
375             // separate them. We don't have enough room (< 2) in the queue, so we submit now to make more room.
376             submissionQueue.submit();
377         }
378         // Try to drain all the IO from the queue first...
379         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
380         // We need to also specify the Native.IOSQE_LINK flag for it to work as otherwise it is not correctly linked
381         // with the timeout.
382         // See:
383         // - https://man7.org/linux/man-pages/man2/io_uring_enter.2.html
384         // - https://git.kernel.dk/cgit/liburing/commit/?h=link-timeout&id=bc1bd5e97e2c758d6fd975bd35843b9b2c770c5a
385         submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
386         // ... but only wait for 200 milliseconds on this
387         submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
388         completionQueue.process(this::handle);
389         for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
390             ioUringBufferRing.close();
391         }
392         completeRingClose();
393     }
394 
395     // We need to prevent the race condition where a wakeup event is submitted to a file descriptor that has
396     // already been freed (and potentially reallocated by the OS). Because submitted events is gated on the
397     // `eventfdAsyncNotify` flag we can close the gate but may need to read any outstanding events that have
398     // (or will) be written.
399     private void drainEventFd() {
400         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
401         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
402         assert !eventFdClosing;
403         eventFdClosing = true;
404         boolean eventPending = eventfdAsyncNotify.getAndSet(true);
405         if (eventPending) {
406             // There is an event that has been or will be written by another thread, so we must wait for the event.
407             // Make sure we're actually listening for writes to the event fd.
408             while (eventfdReadSubmitted == 0) {
409                 submitEventFdRead();
410                 submissionQueue.submit();
411             }
412             // Drain the eventfd of the pending wakup.
413             class DrainFdEventCallback implements CompletionCallback {
414                 boolean eventFdDrained;
415 
416                 @Override
417                 public boolean handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
418                     if (UserData.decodeId(udata) == EVENTFD_ID) {
419                         eventFdDrained = true;
420                     }
421                     return IoUringIoHandler.this.handle(res, flags, udata, extraCqeData);
422                 }
423             }
424             final DrainFdEventCallback handler = new DrainFdEventCallback();
425             completionQueue.process(handler);
426             while (!handler.eventFdDrained) {
427                 submissionQueue.submitAndGet();
428                 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, handler);
429             }
430         }
431         // We've consumed any pending eventfd read and `eventfdAsyncNotify` should never
432         // transition back to false, thus we should never have any more events written.
433         // So, if we have a read event pending, we can cancel it.
434         if (eventfdReadSubmitted != 0) {
435             long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
436             submissionQueue.addCancel(eventfdReadSubmitted, udata);
437             eventfdReadSubmitted = 0;
438             submissionQueue.submit();
439         }
440     }
441 
442     private void completeRingClose() {
443         if (closeCompleted) {
444             // already done.
445             return;
446         }
447         closeCompleted = true;
448         ringBuffer.close();
449         try {
450             eventfd.close();
451         } catch (IOException e) {
452             logger.warn("Failed to close eventfd", e);
453         }
454         eventfdReadBufCleanable.clean();
455         timeoutMemoryCleanable.clean();
456         iovArray.release();
457         msgHdrMemoryArray.release();
458     }
459 
460     @Override
461     public IoRegistration register(IoHandle handle) throws Exception {
462         IoUringIoHandle ioHandle = cast(handle);
463         if (shuttingDown) {
464             throw new IllegalStateException("IoUringIoHandler is shutting down");
465         }
466         DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
467         for (;;) {
468             int id = nextRegistrationId();
469             DefaultIoUringIoRegistration old = registrations.put(id, registration);
470             if (old != null) {
471                 assert old.handle != registration.handle;
472                 registrations.put(id, old);
473             } else {
474                 registration.setId(id);
475                 break;
476             }
477         }
478 
479         return registration;
480     }
481 
482     private int nextRegistrationId() {
483         int id;
484         do {
485             id = nextRegistrationId++;
486         } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
487         return id;
488     }
489 
490     private final class DefaultIoUringIoRegistration implements IoRegistration {
491         private final AtomicBoolean canceled = new AtomicBoolean();
492         private final ThreadAwareExecutor executor;
493         private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
494         final IoUringIoHandle handle;
495 
496         private boolean removeLater;
497         private int outstandingCompletions;
498         private int id;
499 
500         DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
501             this.executor = executor;
502             this.handle = handle;
503         }
504 
505         void setId(int id) {
506             this.id = id;
507         }
508 
509         @Override
510         public long submit(IoOps ops) {
511             IoUringIoOps ioOps = (IoUringIoOps) ops;
512             if (!isValid()) {
513                 return INVALID_ID;
514             }
515             if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
516                 // Because we expect at least 1 completion per submission we can't support IOSQE_CQE_SKIP_SUCCESS
517                 // as it will only produce a completion on failure.
518                 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
519             }
520             long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
521             if (executor.isExecutorThread(Thread.currentThread())) {
522                 submit0(ioOps, udata);
523             } else {
524                 executor.execute(() -> submit0(ioOps, udata));
525             }
526             return udata;
527         }
528 
529         private void submit0(IoUringIoOps ioOps, long udata) {
530             ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
531                     ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
532                     ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
533             );
534             outstandingCompletions++;
535         }
536 
537         @SuppressWarnings("unchecked")
538         @Override
539         public <T> T attachment() {
540             return (T) IoUringIoHandler.this;
541         }
542 
543         @Override
544         public boolean isValid() {
545             return !canceled.get();
546         }
547 
548         @Override
549         public boolean cancel() {
550             if (!canceled.compareAndSet(false, true)) {
551                 // Already cancelled.
552                 return false;
553             }
554             if (executor.isExecutorThread(Thread.currentThread())) {
555                 tryRemove();
556             } else {
557                 executor.execute(this::tryRemove);
558             }
559             return true;
560         }
561 
562         private void tryRemove() {
563             if (outstandingCompletions > 0) {
564                 // We have some completions outstanding, we will remove the id <-> registration mapping
565                 // once these are done.
566                 removeLater = true;
567                 return;
568             }
569             remove();
570         }
571 
572         private void remove() {
573             DefaultIoUringIoRegistration old = registrations.remove(id);
574             assert old == this;
575         }
576 
577         void close() {
578             // Closing the handle will also cancel the registration.
579             // It's important that we not manually cancel as close() might need to submit some work to the ring.
580             assert executor.isExecutorThread(Thread.currentThread());
581             try {
582                 handle.close();
583             } catch (Exception e) {
584                 logger.debug("Exception during closing " + handle, e);
585             }
586         }
587 
588         void handle(int res, int flags, byte op, short data, ByteBuffer extraCqeData) {
589             event.update(res, flags, op, data, extraCqeData);
590             handle.handle(this, event);
591             // Only decrement outstandingCompletions if IORING_CQE_F_MORE is not set as otherwise we know that we will
592             // receive more completions for the intial request.
593             if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
594                 // No more outstanding completions, remove the fd <-> registration mapping now.
595                 removeLater = false;
596                 remove();
597             }
598         }
599     }
600 
601     private static IoUringIoHandle cast(IoHandle handle) {
602         if (handle instanceof IoUringIoHandle) {
603             return (IoUringIoHandle) handle;
604         }
605         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
606     }
607 
608     @Override
609     public void wakeup() {
610         if (!executor.isExecutorThread(Thread.currentThread()) &&
611                 !eventfdAsyncNotify.getAndSet(true)) {
612             // write to the eventfd which will then trigger an eventfd read completion.
613             Native.eventFdWrite(eventfd.intValue(), 1L);
614         }
615     }
616 
617     @Override
618     public boolean isCompatible(Class<? extends IoHandle> handleType) {
619         return IoUringIoHandle.class.isAssignableFrom(handleType);
620     }
621 
622     IovArray iovArray() {
623         if (iovArray.isFull()) {
624             // Submit so we can reuse the iovArray.
625             submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
626         }
627         assert iovArray.count() == 0;
628         return iovArray;
629     }
630 
631     MsgHdrMemoryArray msgHdrMemoryArray() {
632         if (msgHdrMemoryArray.isFull()) {
633             // Submit so we can reuse the msgHdrArray.
634             submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
635         }
636         return msgHdrMemoryArray;
637     }
638 
639     /**
640      * {@code byte[]} that can be used as temporary storage to encode the ipv4 address
641      */
642     byte[] inet4AddressArray() {
643         return inet4AddressArray;
644     }
645 
646     /**
647      * {@code byte[]} that can be used as temporary storage to encode the ipv6 address
648      */
649     byte[] inet6AddressArray() {
650         return inet6AddressArray;
651     }
652 
653     /**
654      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
655      *
656      * @return factory
657      */
658     public static IoHandlerFactory newFactory() {
659         return newFactory(new IoUringIoHandlerConfig());
660     }
661 
662     /**
663      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
664      * Each {@link IoUringIoHandler} will use a ring of size {@code ringSize}.
665      *
666      * @param  ringSize     the size of the ring.
667      * @return              factory
668      */
669     public static IoHandlerFactory newFactory(int ringSize) {
670         IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
671         configuration.setRingSize(ringSize);
672         return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
673     }
674 
675     /**
676      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
677      * Each {@link IoUringIoHandler} will use same option
678      * @param config the io_uring configuration
679      * @return factory
680      */
681     public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
682         IoUring.ensureAvailability();
683         ObjectUtil.checkNotNull(config, "config");
684         return eventLoop -> new IoUringIoHandler(eventLoop, config);
685     }
686 }