View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.channel.IoHandle;
19  import io.netty.channel.IoHandler;
20  import io.netty.channel.IoHandlerContext;
21  import io.netty.channel.IoHandlerFactory;
22  import io.netty.channel.IoOps;
23  import io.netty.channel.IoRegistration;
24  import io.netty.channel.unix.Buffer;
25  import io.netty.channel.unix.Errors;
26  import io.netty.channel.unix.FileDescriptor;
27  import io.netty.channel.unix.IovArray;
28  import io.netty.util.collection.IntObjectHashMap;
29  import io.netty.util.collection.IntObjectMap;
30  import io.netty.util.concurrent.ThreadAwareExecutor;
31  import io.netty.util.internal.CleanableDirectBuffer;
32  import io.netty.util.internal.ObjectUtil;
33  import io.netty.util.internal.StringUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.io.IOException;
38  import java.io.UncheckedIOException;
39  import java.nio.ByteBuffer;
40  import java.util.ArrayList;
41  import java.util.Collection;
42  import java.util.List;
43  import java.util.concurrent.TimeUnit;
44  import java.util.concurrent.atomic.AtomicBoolean;
45  
46  import static java.lang.Math.max;
47  import static java.lang.Math.min;
48  import static java.util.Objects.requireNonNull;
49  
50  /**
51   * {@link IoHandler} which is implemented in terms of the Linux-specific {@code io_uring} API.
52   */
53  public final class IoUringIoHandler implements IoHandler {
54      private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
55  
56      private final RingBuffer ringBuffer;
57      private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
58      private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
59      // The maximum number of bytes for an InetAddress / Inet6Address
60      private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
61      private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
62  
63      private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
64      private final FileDescriptor eventfd;
65      private final CleanableDirectBuffer eventfdReadBufCleanable;
66      private final ByteBuffer eventfdReadBuf;
67      private final long eventfdReadBufAddress;
68      private final CleanableDirectBuffer timeoutMemoryCleanable;
69      private final ByteBuffer timeoutMemory;
70      private final long timeoutMemoryAddress;
71      private final IovArray iovArray;
72      private final MsgHdrMemoryArray msgHdrMemoryArray;
73      private long eventfdReadSubmitted;
74      private boolean eventFdClosing;
75      private volatile boolean shuttingDown;
76      private boolean closeCompleted;
77      private int nextRegistrationId = Integer.MIN_VALUE;
78  
79      // these two ids are used internally any so can't be used by nextRegistrationId().
80      private static final int EVENTFD_ID = Integer.MAX_VALUE;
81      private static final int RINGFD_ID = EVENTFD_ID - 1;
82      private static final int INVALID_ID = 0;
83  
84      private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
85  
86      private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
87      private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
88  
89      private final ThreadAwareExecutor executor;
90  
91      IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
92          // Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
93          IoUring.ensureAvailability();
94          this.executor = requireNonNull(executor, "executor");
95          requireNonNull(config, "config");
96          int setupFlags = Native.setupFlags(config.singleIssuer());
97  
98          //The default cq size is always twice the ringSize.
99          // It only makes sense when the user actually specifies the cq ring size.
100         int cqSize = 2 * config.getRingSize();
101         if (config.needSetupCqeSize()) {
102             assert IoUring.isSetupCqeSizeSupported();
103             setupFlags |= Native.IORING_SETUP_CQSIZE;
104             cqSize = config.getCqSize();
105         }
106         this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
107         if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
108             int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
109             int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
110             int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
111             if (result < 0) {
112                 // Close ringBuffer before throwing to ensure we release all memory on failure.
113                 ringBuffer.close();
114                 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
115             }
116         }
117 
118         registeredIoUringBufferRing = new IntObjectHashMap<>();
119         Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
120         if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
121             for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
122                 try {
123                     IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
124                     registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
125                 } catch (Errors.NativeIoException e) {
126                     for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
127                         bufferRing.close();
128                     }
129                     // Close ringBuffer before throwing to ensure we release all memory on failure.
130                     ringBuffer.close();
131                     throw new UncheckedIOException(e);
132                 }
133             }
134         }
135 
136         registrations = new IntObjectHashMap<>();
137         eventfd = Native.newBlockingEventFd();
138         eventfdReadBufCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
139         eventfdReadBuf = eventfdReadBufCleanable.buffer();
140         eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
141         timeoutMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(KERNEL_TIMESPEC_SIZE);
142         timeoutMemory = timeoutMemoryCleanable.buffer();
143         timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
144         iovArray = new IovArray(IoUring.NUM_ELEMENTS_IOVEC);
145         msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1024);
146     }
147 
148     @Override
149     public void initialize() {
150         ringBuffer.enable();
151         // Fill all buffer rings now.
152         for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
153             bufferRing.initialize();
154         }
155     }
156 
157     @Override
158     public int run(IoHandlerContext context) {
159         if (closeCompleted) {
160             if (context.shouldReportActiveIoTime()) {
161                 context.reportActiveIoTime(0);
162             }
163             return 0;
164         }
165         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
166         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
167         if (!completionQueue.hasCompletions() && context.canBlock()) {
168             if (eventfdReadSubmitted == 0) {
169                 submitEventFdRead();
170             }
171             long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
172             submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
173         } else {
174             // Even if we have some completions already pending we can still try to even fetch more.
175             submitAndClearNow(submissionQueue);
176         }
177 
178         int processed;
179         if (context.shouldReportActiveIoTime()) {
180             // Timer starts after the blocking wait, around the processing of completions.
181             long activeIoStartTimeNanos = System.nanoTime();
182             processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
183             long activeIoEndTimeNanos = System.nanoTime();
184             context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
185         } else {
186             processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
187         }
188         return processed;
189     }
190 
191     private int processCompletionsAndHandleOverflow(SubmissionQueue submissionQueue, CompletionQueue completionQueue,
192                                          CompletionCallback callback) {
193         int processed = 0;
194         // Bound the maximum number of times this will loop before we return and so execute some non IO stuff.
195         // 128 here is just some sort of bound and another number might be ok as well.
196         for (int i = 0; i < 128; i++) {
197             int p = completionQueue.process(callback);
198             if ((submissionQueue.flags() & Native.IORING_SQ_CQ_OVERFLOW) != 0) {
199                 logger.warn("CompletionQueue overflow detected, consider increasing size: {} ",
200                         completionQueue.ringEntries);
201                 submitAndClearNow(submissionQueue);
202             } else if (p == 0 &&
203                     // Check if there are any more submissions pending, if not break the loop.
204                     (submissionQueue.count() == 0 ||
205                     // Let's try to submit again and check if there are new completions to handle.
206                     // Only break the loop if there was nothing submitted and there are no new completions.
207                     (submitAndClearNow(submissionQueue) == 0 && !completionQueue.hasCompletions()))) {
208                 break;
209             }
210             processed += p;
211         }
212         return processed;
213     }
214 
215     private int submitAndClearNow(SubmissionQueue submissionQueue) {
216         int submitted = submissionQueue.submitAndGetNow();
217 
218         // Clear the iovArray as we can re-use it now as things are considered stable after submission:
219         // See https://man7.org/linux/man-pages/man3/io_uring_prep_sendmsg.3.html
220         iovArray.clear();
221         msgHdrMemoryArray.clear();
222         return submitted;
223     }
224 
225     private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
226             throws Errors.NativeIoException {
227         short bufferRingSize = bufferRingConfig.bufferRingSize();
228         short bufferGroupId = bufferRingConfig.bufferGroupId();
229         int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
230         long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
231         if (ioUringBufRingAddr < 0) {
232             throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
233         }
234         return new IoUringBufferRing(ringFd,
235                 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
236                 bufferRingSize, bufferRingConfig.batchSize(),
237                 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator(),
238                 bufferRingConfig.isBatchAllocation()
239         );
240     }
241 
242     IoUringBufferRing findBufferRing(short bgId) {
243         IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
244         if (cached != null) {
245             return cached;
246         }
247         throw new IllegalArgumentException(
248                 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
249         );
250     }
251 
252     private static void handleLoopException(Throwable throwable) {
253         logger.warn("Unexpected exception in the IO event loop.", throwable);
254 
255         // Prevent possible consecutive immediate failures that lead to
256         // excessive CPU consumption.
257         try {
258             Thread.sleep(100);
259         } catch (InterruptedException ignore) {
260             // ignore
261         }
262     }
263 
264     private boolean handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
265         try {
266             int id = UserData.decodeId(udata);
267             byte op = UserData.decodeOp(udata);
268             short data = UserData.decodeData(udata);
269 
270             if (logger.isTraceEnabled()) {
271                 logger.trace("completed(ring {}): {}(id={}, res={})",
272                         ringBuffer.fd(), Native.opToStr(op), data, res);
273             }
274             if (id == EVENTFD_ID) {
275                 handleEventFdRead();
276                 return true;
277             }
278             if (id == RINGFD_ID) {
279                 // Just return
280                 return true;
281             }
282             DefaultIoUringIoRegistration registration = registrations.get(id);
283             if (registration == null) {
284                 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
285                         Native.opToStr(op), id, res);
286                 return true;
287             }
288             registration.handle(res, flags, op, data, extraCqeData);
289             return true;
290         } catch (Error e) {
291             throw e;
292         } catch (Throwable throwable) {
293             handleLoopException(throwable);
294             return true;
295         }
296     }
297 
298     private void handleEventFdRead() {
299         eventfdReadSubmitted = 0;
300         if (!eventFdClosing) {
301             eventfdAsyncNotify.set(false);
302             submitEventFdRead();
303         }
304     }
305 
306     private void submitEventFdRead() {
307         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
308         long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
309 
310         eventfdReadSubmitted = submissionQueue.addEventFdRead(
311                 eventfd.intValue(), eventfdReadBufAddress, 0, 8, udata);
312     }
313 
314     private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
315                                          boolean linkTimeout, long timeoutNanoSeconds) {
316         if (timeoutNanoSeconds != -1) {
317             long udata = UserData.encode(RINGFD_ID,
318                     linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
319             // We use the same timespec pointer for all add*Timeout operations. This only works because we call
320             // submit directly after it. This ensures the submitted timeout is considered "stable" and so can be reused.
321             long seconds, nanoSeconds;
322             if (timeoutNanoSeconds == 0) {
323                 seconds = 0;
324                 nanoSeconds = 0;
325             } else {
326                 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
327                 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
328             }
329 
330             timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
331             timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
332             if (linkTimeout) {
333                 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
334             } else {
335                 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
336             }
337         }
338         int submitted = submissionQueue.submitAndGet();
339         // Clear the iovArray as we can re-use it now as things are considered stable after submission:
340         // See https://man7.org/linux/man-pages/man3/io_uring_prep_sendmsg.3.html
341         iovArray.clear();
342         msgHdrMemoryArray.clear();
343         return submitted;
344     }
345 
346     @Override
347     public void prepareToDestroy() {
348         shuttingDown = true;
349         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
350         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
351 
352         List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
353 
354         for (DefaultIoUringIoRegistration registration: copy) {
355             registration.close();
356         }
357 
358         // Write to the eventfd to ensure that if we submitted a read for the eventfd we will see the completion event.
359         Native.eventFdWrite(eventfd.intValue(), 1L);
360 
361         // Ensure all previously submitted IOs get to complete before tearing down everything.
362         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
363         submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
364 
365         // Submit everything and wait until we could drain i.
366         submissionQueue.submitAndGet();
367 
368         while (completionQueue.hasCompletions()) {
369             processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
370             if (submissionQueue.count() > 0) {
371                 submissionQueue.submitAndGetNow();
372             }
373         }
374     }
375 
376     @Override
377     public void destroy() {
378         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
379         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
380         drainEventFd();
381         if (submissionQueue.remaining() < 2) {
382             // We need to submit 2 linked operations. Since they are linked, we cannot allow a submit-call to
383             // separate them. We don't have enough room (< 2) in the queue, so we submit now to make more room.
384             submissionQueue.submit();
385         }
386         // Try to drain all the IO from the queue first...
387         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
388         // We need to also specify the Native.IOSQE_LINK flag for it to work as otherwise it is not correctly linked
389         // with the timeout.
390         // See:
391         // - https://man7.org/linux/man-pages/man2/io_uring_enter.2.html
392         // - https://git.kernel.dk/cgit/liburing/commit/?h=link-timeout&id=bc1bd5e97e2c758d6fd975bd35843b9b2c770c5a
393         submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
394         // ... but only wait for 200 milliseconds on this
395         submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
396         completionQueue.process(this::handle);
397         for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
398             ioUringBufferRing.close();
399         }
400         completeRingClose();
401     }
402 
403     // We need to prevent the race condition where a wakeup event is submitted to a file descriptor that has
404     // already been freed (and potentially reallocated by the OS). Because submitted events is gated on the
405     // `eventfdAsyncNotify` flag we can close the gate but may need to read any outstanding events that have
406     // (or will) be written.
407     private void drainEventFd() {
408         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
409         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
410         assert !eventFdClosing;
411         eventFdClosing = true;
412         boolean eventPending = eventfdAsyncNotify.getAndSet(true);
413         if (eventPending) {
414             // There is an event that has been or will be written by another thread, so we must wait for the event.
415             // Make sure we're actually listening for writes to the event fd.
416             while (eventfdReadSubmitted == 0) {
417                 submitEventFdRead();
418                 submissionQueue.submit();
419             }
420             // Drain the eventfd of the pending wakup.
421             class DrainFdEventCallback implements CompletionCallback {
422                 boolean eventFdDrained;
423 
424                 @Override
425                 public boolean handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
426                     if (UserData.decodeId(udata) == EVENTFD_ID) {
427                         eventFdDrained = true;
428                     }
429                     return IoUringIoHandler.this.handle(res, flags, udata, extraCqeData);
430                 }
431             }
432             final DrainFdEventCallback handler = new DrainFdEventCallback();
433             completionQueue.process(handler);
434             while (!handler.eventFdDrained) {
435                 submissionQueue.submitAndGet();
436                 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, handler);
437             }
438         }
439         // We've consumed any pending eventfd read and `eventfdAsyncNotify` should never
440         // transition back to false, thus we should never have any more events written.
441         // So, if we have a read event pending, we can cancel it.
442         if (eventfdReadSubmitted != 0) {
443             long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
444             submissionQueue.addCancel(eventfdReadSubmitted, udata);
445             eventfdReadSubmitted = 0;
446             submissionQueue.submit();
447         }
448     }
449 
450     private void completeRingClose() {
451         if (closeCompleted) {
452             // already done.
453             return;
454         }
455         closeCompleted = true;
456         ringBuffer.close();
457         try {
458             eventfd.close();
459         } catch (IOException e) {
460             logger.warn("Failed to close eventfd", e);
461         }
462         eventfdReadBufCleanable.clean();
463         timeoutMemoryCleanable.clean();
464         iovArray.release();
465         msgHdrMemoryArray.release();
466     }
467 
468     @Override
469     public IoRegistration register(IoHandle handle) throws Exception {
470         IoUringIoHandle ioHandle = cast(handle);
471         if (shuttingDown) {
472             throw new IllegalStateException("IoUringIoHandler is shutting down");
473         }
474         DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
475         for (;;) {
476             int id = nextRegistrationId();
477             DefaultIoUringIoRegistration old = registrations.put(id, registration);
478             if (old != null) {
479                 assert old.handle != registration.handle;
480                 registrations.put(id, old);
481             } else {
482                 registration.setId(id);
483                 ioHandle.registered();
484                 break;
485             }
486         }
487 
488         return registration;
489     }
490 
491     private int nextRegistrationId() {
492         int id;
493         do {
494             id = nextRegistrationId++;
495         } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
496         return id;
497     }
498 
499     private final class DefaultIoUringIoRegistration implements IoRegistration {
500         private final AtomicBoolean canceled = new AtomicBoolean();
501         private final ThreadAwareExecutor executor;
502         private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
503         final IoUringIoHandle handle;
504 
505         private boolean removeLater;
506         private int outstandingCompletions;
507         private int id;
508 
509         DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
510             this.executor = executor;
511             this.handle = handle;
512         }
513 
514         void setId(int id) {
515             this.id = id;
516         }
517 
518         @Override
519         public long submit(IoOps ops) {
520             IoUringIoOps ioOps = (IoUringIoOps) ops;
521             if (!isValid()) {
522                 return INVALID_ID;
523             }
524             if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
525                 // Because we expect at least 1 completion per submission we can't support IOSQE_CQE_SKIP_SUCCESS
526                 // as it will only produce a completion on failure.
527                 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
528             }
529             long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
530             if (executor.isExecutorThread(Thread.currentThread())) {
531                 submit0(ioOps, udata);
532             } else {
533                 executor.execute(() -> submit0(ioOps, udata));
534             }
535             return udata;
536         }
537 
538         private void submit0(IoUringIoOps ioOps, long udata) {
539             ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
540                     ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
541                     ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
542             );
543             outstandingCompletions++;
544         }
545 
546         @SuppressWarnings("unchecked")
547         @Override
548         public <T> T attachment() {
549             return (T) IoUringIoHandler.this;
550         }
551 
552         @Override
553         public boolean isValid() {
554             return !canceled.get();
555         }
556 
557         @Override
558         public boolean cancel() {
559             if (!canceled.compareAndSet(false, true)) {
560                 // Already cancelled.
561                 return false;
562             }
563             if (executor.isExecutorThread(Thread.currentThread())) {
564                 tryRemove();
565             } else {
566                 executor.execute(this::tryRemove);
567             }
568             return true;
569         }
570 
571         private void tryRemove() {
572             if (outstandingCompletions > 0) {
573                 // We have some completions outstanding, we will remove the id <-> registration mapping
574                 // once these are done.
575                 removeLater = true;
576                 return;
577             }
578             remove();
579         }
580 
581         private void remove() {
582             DefaultIoUringIoRegistration old = registrations.remove(id);
583             assert old == this;
584             handle.unregistered();
585         }
586 
587         void close() {
588             // Closing the handle will also cancel the registration.
589             // It's important that we not manually cancel as close() might need to submit some work to the ring.
590             assert executor.isExecutorThread(Thread.currentThread());
591             try {
592                 handle.close();
593             } catch (Exception e) {
594                 logger.debug("Exception during closing " + handle, e);
595             }
596         }
597 
598         void handle(int res, int flags, byte op, short data, ByteBuffer extraCqeData) {
599             event.update(res, flags, op, data, extraCqeData);
600             handle.handle(this, event);
601             // Only decrement outstandingCompletions if IORING_CQE_F_MORE is not set as otherwise we know that we will
602             // receive more completions for the intial request.
603             if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
604                 // No more outstanding completions, remove the fd <-> registration mapping now.
605                 removeLater = false;
606                 remove();
607             }
608         }
609     }
610 
611     private static IoUringIoHandle cast(IoHandle handle) {
612         if (handle instanceof IoUringIoHandle) {
613             return (IoUringIoHandle) handle;
614         }
615         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
616     }
617 
618     @Override
619     public void wakeup() {
620         if (!executor.isExecutorThread(Thread.currentThread()) &&
621                 !eventfdAsyncNotify.getAndSet(true)) {
622             // write to the eventfd which will then trigger an eventfd read completion.
623             Native.eventFdWrite(eventfd.intValue(), 1L);
624         }
625     }
626 
627     @Override
628     public boolean isCompatible(Class<? extends IoHandle> handleType) {
629         return IoUringIoHandle.class.isAssignableFrom(handleType);
630     }
631 
632     IovArray iovArray() {
633         if (iovArray.isFull()) {
634             // Submit so we can reuse the iovArray.
635             submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
636         }
637         assert iovArray.count() == 0;
638         return iovArray;
639     }
640 
641     MsgHdrMemoryArray msgHdrMemoryArray() {
642         if (msgHdrMemoryArray.isFull()) {
643             // Submit so we can reuse the msgHdrArray.
644             submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
645         }
646         return msgHdrMemoryArray;
647     }
648 
649     /**
650      * {@code byte[]} that can be used as temporary storage to encode the ipv4 address
651      */
652     byte[] inet4AddressArray() {
653         return inet4AddressArray;
654     }
655 
656     /**
657      * {@code byte[]} that can be used as temporary storage to encode the ipv6 address
658      */
659     byte[] inet6AddressArray() {
660         return inet6AddressArray;
661     }
662 
663     /**
664      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
665      *
666      * @return factory
667      */
668     public static IoHandlerFactory newFactory() {
669         return newFactory(new IoUringIoHandlerConfig());
670     }
671 
672     /**
673      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
674      * Each {@link IoUringIoHandler} will use a ring of size {@code ringSize}.
675      *
676      * @param  ringSize     the size of the ring.
677      * @return              factory
678      */
679     public static IoHandlerFactory newFactory(int ringSize) {
680         IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
681         configuration.setRingSize(ringSize);
682         return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
683     }
684 
685     /**
686      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
687      * Each {@link IoUringIoHandler} will use same option
688      * @param config the io_uring configuration
689      * @return factory
690      */
691     public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
692         IoUring.ensureAvailability();
693         final IoUringIoHandlerConfig copy = ObjectUtil.checkNotNull(config, "config").verifyAndClone();
694         return new IoHandlerFactory() {
695             @Override
696             public IoHandler newHandler(ThreadAwareExecutor eventLoop) {
697                 return new IoUringIoHandler(eventLoop, copy);
698             }
699 
700             @Override
701             public boolean isChangingThreadSupported() {
702                 return !copy.singleIssuer();
703             }
704         };
705     }
706 }