View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.channel.IoHandlerContext;
19  import io.netty.channel.IoHandle;
20  import io.netty.channel.IoHandler;
21  import io.netty.channel.IoHandlerFactory;
22  import io.netty.channel.IoOps;
23  import io.netty.channel.IoRegistration;
24  import io.netty.channel.unix.Buffer;
25  import io.netty.channel.unix.Errors;
26  import io.netty.channel.unix.FileDescriptor;
27  import io.netty.channel.unix.IovArray;
28  import io.netty.util.collection.IntObjectHashMap;
29  import io.netty.util.collection.IntObjectMap;
30  import io.netty.util.concurrent.ThreadAwareExecutor;
31  import io.netty.util.internal.CleanableDirectBuffer;
32  import io.netty.util.internal.ObjectUtil;
33  import io.netty.util.internal.StringUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.io.IOException;
38  import java.io.UncheckedIOException;
39  import java.nio.ByteBuffer;
40  import java.util.ArrayList;
41  import java.util.Collection;
42  import java.util.List;
43  import java.util.concurrent.TimeUnit;
44  import java.util.concurrent.atomic.AtomicBoolean;
45  
46  import static java.lang.Math.max;
47  import static java.lang.Math.min;
48  import static java.util.Objects.requireNonNull;
49  
50  /**
51   * {@link IoHandler} which is implemented in terms of the Linux-specific {@code io_uring} API.
52   */
53  public final class IoUringIoHandler implements IoHandler {
54      private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
55  
56      private final RingBuffer ringBuffer;
57      private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
58      private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
59      // The maximum number of bytes for an InetAddress / Inet6Address
60      private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
61      private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
62  
63      private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
64      private final FileDescriptor eventfd;
65      private final CleanableDirectBuffer eventfdReadBufCleanable;
66      private final ByteBuffer eventfdReadBuf;
67      private final long eventfdReadBufAddress;
68      private final CleanableDirectBuffer timeoutMemoryCleanable;
69      private final ByteBuffer timeoutMemory;
70      private final long timeoutMemoryAddress;
71      private final IovArray iovArray;
72      private long eventfdReadSubmitted;
73      private boolean eventFdClosing;
74      private volatile boolean shuttingDown;
75      private boolean closeCompleted;
76      private int nextRegistrationId = Integer.MIN_VALUE;
77  
78      // these two ids are used internally any so can't be used by nextRegistrationId().
79      private static final int EVENTFD_ID = Integer.MAX_VALUE;
80      private static final int RINGFD_ID = EVENTFD_ID - 1;
81      private static final int INVALID_ID = 0;
82  
83      private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
84  
85      private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
86      private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
87  
88      private final ThreadAwareExecutor executor;
89  
90      IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
91          // Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
92          IoUring.ensureAvailability();
93          this.executor = requireNonNull(executor, "executor");
94          requireNonNull(config, "config");
95          int setupFlags = Native.setupFlags();
96  
97          //The default cq size is always twice the ringSize.
98          // It only makes sense when the user actually specifies the cq ring size.
99          int cqSize = 2 * config.getRingSize();
100         if (config.needSetupCqeSize()) {
101             if (!IoUring.isSetupCqeSizeSupported()) {
102                 throw new UnsupportedOperationException("IORING_SETUP_CQSIZE is not supported");
103             }
104             setupFlags |= Native.IORING_SETUP_CQSIZE;
105             cqSize = config.checkCqSize(config.getCqSize());
106         }
107         this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
108         if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
109             int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
110             int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
111             int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
112             if (result < 0) {
113                 // Close ringBuffer before throwing to ensure we release all memory on failure.
114                 ringBuffer.close();
115                 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
116             }
117         }
118 
119         registeredIoUringBufferRing = new IntObjectHashMap<>();
120         Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
121         if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
122             if (!IoUring.isRegisterBufferRingSupported()) {
123                 // Close ringBuffer before throwing to ensure we release all memory on failure.
124                 ringBuffer.close();
125                 throw new UnsupportedOperationException("IORING_REGISTER_PBUF_RING is not supported");
126             }
127             for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
128                 try {
129                     IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
130                     registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
131                 } catch (Errors.NativeIoException e) {
132                     for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
133                         bufferRing.close();
134                     }
135                     // Close ringBuffer before throwing to ensure we release all memory on failure.
136                     ringBuffer.close();
137                     throw new UncheckedIOException(e);
138                 }
139             }
140         }
141 
142         registrations = new IntObjectHashMap<>();
143         eventfd = Native.newBlockingEventFd();
144         eventfdReadBufCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
145         eventfdReadBuf = eventfdReadBufCleanable.buffer();
146         eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
147         timeoutMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(KERNEL_TIMESPEC_SIZE);
148         timeoutMemory = timeoutMemoryCleanable.buffer();
149         timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
150         iovArray = new IovArray(IoUring.NUM_ELEMENTS_IOVEC);
151     }
152 
153     @Override
154     public void initialize() {
155         ringBuffer.enable();
156         // Fill all buffer rings now.
157         for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
158             bufferRing.initialize();
159         }
160     }
161 
162     @Override
163     public int run(IoHandlerContext context) {
164         if (closeCompleted) {
165             return 0;
166         }
167         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
168         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
169         if (!completionQueue.hasCompletions() && context.canBlock()) {
170             if (eventfdReadSubmitted == 0) {
171                 submitEventFdRead();
172             }
173             long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
174             submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
175         } else {
176             // Even if we have some completions already pending we can still try to even fetch more.
177             submitAndClearNow(submissionQueue);
178         }
179         return completionQueue.process(this::handle);
180     }
181 
182     private int submitAndClearNow(SubmissionQueue submissionQueue) {
183         int submitted = submissionQueue.submitAndGetNow();
184 
185         // Clear the iovArray as we can re-use it now as things are considered stable after submission:
186         // See https://man7.org/linux/man-pages/man3/io_uring_prep_sendmsg.3.html
187         iovArray.clear();
188         return submitted;
189     }
190 
191     private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
192             throws Errors.NativeIoException {
193         short bufferRingSize = bufferRingConfig.bufferRingSize();
194         short bufferGroupId = bufferRingConfig.bufferGroupId();
195         int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
196         long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
197         if (ioUringBufRingAddr < 0) {
198             throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
199         }
200         return new IoUringBufferRing(ringFd,
201                 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
202                 bufferRingSize, bufferRingConfig.batchSize(),
203                 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator(),
204                 bufferRingConfig.isBatchAllocation()
205         );
206     }
207 
208     IoUringBufferRing findBufferRing(short bgId) {
209         IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
210         if (cached != null) {
211             return cached;
212         }
213         throw new IllegalArgumentException(
214                 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
215         );
216     }
217 
218     private static void handleLoopException(Throwable throwable) {
219         logger.warn("Unexpected exception in the IO event loop.", throwable);
220 
221         // Prevent possible consecutive immediate failures that lead to
222         // excessive CPU consumption.
223         try {
224             Thread.sleep(100);
225         } catch (InterruptedException ignore) {
226             // ignore
227         }
228     }
229 
230     private boolean handle(int res, int flags, long udata) {
231         try {
232             int id = UserData.decodeId(udata);
233             byte op = UserData.decodeOp(udata);
234             short data = UserData.decodeData(udata);
235 
236             if (logger.isTraceEnabled()) {
237                 logger.trace("completed(ring {}): {}(id={}, res={})",
238                         ringBuffer.fd(), Native.opToStr(op), data, res);
239             }
240             if (id == EVENTFD_ID) {
241                 handleEventFdRead();
242                 return true;
243             }
244             if (id == RINGFD_ID) {
245                 // Just return
246                 return true;
247             }
248             DefaultIoUringIoRegistration registration = registrations.get(id);
249             if (registration == null) {
250                 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
251                         Native.opToStr(op), id, res);
252                 return true;
253             }
254             registration.handle(res, flags, op, data);
255             return true;
256         } catch (Error e) {
257             throw e;
258         } catch (Throwable throwable) {
259             handleLoopException(throwable);
260             return true;
261         }
262     }
263 
264     private void handleEventFdRead() {
265         eventfdReadSubmitted = 0;
266         if (!eventFdClosing) {
267             eventfdAsyncNotify.set(false);
268             submitEventFdRead();
269         }
270     }
271 
272     private void submitEventFdRead() {
273         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
274         long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
275 
276         eventfdReadSubmitted = submissionQueue.addEventFdRead(
277                 eventfd.intValue(), eventfdReadBufAddress, 0, 8, udata);
278     }
279 
280     private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
281                                          boolean linkTimeout, long timeoutNanoSeconds) {
282         if (timeoutNanoSeconds != -1) {
283             long udata = UserData.encode(RINGFD_ID,
284                     linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
285             // We use the same timespec pointer for all add*Timeout operations. This only works because we call
286             // submit directly after it. This ensures the submitted timeout is considered "stable" and so can be reused.
287             long seconds, nanoSeconds;
288             if (timeoutNanoSeconds == 0) {
289                 seconds = 0;
290                 nanoSeconds = 0;
291             } else {
292                 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
293                 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
294             }
295 
296             timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
297             timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
298             if (linkTimeout) {
299                 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
300             } else {
301                 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
302             }
303         }
304         int submitted = submissionQueue.submitAndGet();
305         // Clear the iovArray as we can re-use it now as things are considered stable after submission:
306         // See https://man7.org/linux/man-pages/man3/io_uring_prep_sendmsg.3.html
307         iovArray.clear();
308         return submitted;
309     }
310 
311     @Override
312     public void prepareToDestroy() {
313         shuttingDown = true;
314         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
315         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
316 
317         List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
318 
319         for (DefaultIoUringIoRegistration registration: copy) {
320             registration.close();
321         }
322 
323         // Write to the eventfd to ensure that if we submitted a read for the eventfd we will see the completion event.
324         Native.eventFdWrite(eventfd.intValue(), 1L);
325 
326         // Ensure all previously submitted IOs get to complete before tearing down everything.
327         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
328         submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
329 
330         // Submit everything and wait until we could drain i.
331         submissionQueue.submitAndGet();
332         while (completionQueue.hasCompletions()) {
333             completionQueue.process(this::handle);
334 
335             if (submissionQueue.count() > 0) {
336                 submissionQueue.submitAndGetNow();
337             }
338         }
339     }
340 
341     @Override
342     public void destroy() {
343         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
344         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
345         drainEventFd();
346         if (submissionQueue.remaining() < 2) {
347             // We need to submit 2 linked operations. Since they are linked, we cannot allow a submit-call to
348             // separate them. We don't have enough room (< 2) in the queue, so we submit now to make more room.
349             submissionQueue.submit();
350         }
351         // Try to drain all the IO from the queue first...
352         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
353         // We need to also specify the Native.IOSQE_LINK flag for it to work as otherwise it is not correctly linked
354         // with the timeout.
355         // See:
356         // - https://man7.org/linux/man-pages/man2/io_uring_enter.2.html
357         // - https://git.kernel.dk/cgit/liburing/commit/?h=link-timeout&id=bc1bd5e97e2c758d6fd975bd35843b9b2c770c5a
358         submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
359         // ... but only wait for 200 milliseconds on this
360         submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
361         completionQueue.process(this::handle);
362         for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
363             ioUringBufferRing.close();
364         }
365         completeRingClose();
366     }
367 
368     // We need to prevent the race condition where a wakeup event is submitted to a file descriptor that has
369     // already been freed (and potentially reallocated by the OS). Because submitted events is gated on the
370     // `eventfdAsyncNotify` flag we can close the gate but may need to read any outstanding events that have
371     // (or will) be written.
372     private void drainEventFd() {
373         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
374         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
375         assert !eventFdClosing;
376         eventFdClosing = true;
377         boolean eventPending = eventfdAsyncNotify.getAndSet(true);
378         if (eventPending) {
379             // There is an event that has been or will be written by another thread, so we must wait for the event.
380             // Make sure we're actually listening for writes to the event fd.
381             while (eventfdReadSubmitted == 0) {
382                 submitEventFdRead();
383                 submissionQueue.submit();
384             }
385             // Drain the eventfd of the pending wakup.
386             class DrainFdEventCallback implements CompletionCallback {
387                 boolean eventFdDrained;
388 
389                 @Override
390                 public boolean handle(int res, int flags, long udata) {
391                     if (UserData.decodeId(udata) == EVENTFD_ID) {
392                         eventFdDrained = true;
393                     }
394                     return IoUringIoHandler.this.handle(res, flags, udata);
395                 }
396             }
397             final DrainFdEventCallback handler = new DrainFdEventCallback();
398             completionQueue.process(handler);
399             while (!handler.eventFdDrained) {
400                 submissionQueue.submitAndGet();
401                 completionQueue.process(handler);
402             }
403         }
404         // We've consumed any pending eventfd read and `eventfdAsyncNotify` should never
405         // transition back to false, thus we should never have any more events written.
406         // So, if we have a read event pending, we can cancel it.
407         if (eventfdReadSubmitted != 0) {
408             long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
409             submissionQueue.addCancel(eventfdReadSubmitted, udata);
410             eventfdReadSubmitted = 0;
411             submissionQueue.submit();
412         }
413     }
414 
415     private void completeRingClose() {
416         if (closeCompleted) {
417             // already done.
418             return;
419         }
420         closeCompleted = true;
421         ringBuffer.close();
422         try {
423             eventfd.close();
424         } catch (IOException e) {
425             logger.warn("Failed to close eventfd", e);
426         }
427         eventfdReadBufCleanable.clean();
428         timeoutMemoryCleanable.clean();
429         iovArray.release();
430     }
431 
432     @Override
433     public IoRegistration register(IoHandle handle) throws Exception {
434         IoUringIoHandle ioHandle = cast(handle);
435         if (shuttingDown) {
436             throw new IllegalStateException("IoUringIoHandler is shutting down");
437         }
438         DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
439         for (;;) {
440             int id = nextRegistrationId();
441             DefaultIoUringIoRegistration old = registrations.put(id, registration);
442             if (old != null) {
443                 assert old.handle != registration.handle;
444                 registrations.put(id, old);
445             } else {
446                 registration.setId(id);
447                 break;
448             }
449         }
450 
451         return registration;
452     }
453 
454     private int nextRegistrationId() {
455         int id;
456         do {
457             id = nextRegistrationId++;
458         } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
459         return id;
460     }
461 
462     private final class DefaultIoUringIoRegistration implements IoRegistration {
463         private final AtomicBoolean canceled = new AtomicBoolean();
464         private final ThreadAwareExecutor executor;
465         private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
466         final IoUringIoHandle handle;
467 
468         private boolean removeLater;
469         private int outstandingCompletions;
470         private int id;
471 
472         DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
473             this.executor = executor;
474             this.handle = handle;
475         }
476 
477         void setId(int id) {
478             this.id = id;
479         }
480 
481         @Override
482         public long submit(IoOps ops) {
483             IoUringIoOps ioOps = (IoUringIoOps) ops;
484             if (!isValid()) {
485                 return INVALID_ID;
486             }
487             if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
488                 // Because we expect at least 1 completion per submission we can't support IOSQE_CQE_SKIP_SUCCESS
489                 // as it will only produce a completion on failure.
490                 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
491             }
492             long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
493             if (executor.isExecutorThread(Thread.currentThread())) {
494                 submit0(ioOps, udata);
495             } else {
496                 executor.execute(() -> submit0(ioOps, udata));
497             }
498             return udata;
499         }
500 
501         private void submit0(IoUringIoOps ioOps, long udata) {
502             ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
503                     ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
504                     ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
505             );
506             outstandingCompletions++;
507         }
508 
509         @SuppressWarnings("unchecked")
510         @Override
511         public <T> T attachment() {
512             return (T) IoUringIoHandler.this;
513         }
514 
515         @Override
516         public boolean isValid() {
517             return !canceled.get();
518         }
519 
520         @Override
521         public boolean cancel() {
522             if (!canceled.compareAndSet(false, true)) {
523                 // Already cancelled.
524                 return false;
525             }
526             if (executor.isExecutorThread(Thread.currentThread())) {
527                 tryRemove();
528             } else {
529                 executor.execute(this::tryRemove);
530             }
531             return true;
532         }
533 
534         private void tryRemove() {
535             if (outstandingCompletions > 0) {
536                 // We have some completions outstanding, we will remove the id <-> registration mapping
537                 // once these are done.
538                 removeLater = true;
539                 return;
540             }
541             remove();
542         }
543 
544         private void remove() {
545             DefaultIoUringIoRegistration old = registrations.remove(id);
546             assert old == this;
547         }
548 
549         void close() {
550             // Closing the handle will also cancel the registration.
551             // It's important that we not manually cancel as close() might need to submit some work to the ring.
552             assert executor.isExecutorThread(Thread.currentThread());
553             try {
554                 handle.close();
555             } catch (Exception e) {
556                 logger.debug("Exception during closing " + handle, e);
557             }
558         }
559 
560         void handle(int res, int flags, byte op, short data) {
561             event.update(res, flags, op, data);
562             handle.handle(this, event);
563             // Only decrement outstandingCompletions if IORING_CQE_F_MORE is not set as otherwise we know that we will
564             // receive more completions for the intial request.
565             if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
566                 // No more outstanding completions, remove the fd <-> registration mapping now.
567                 removeLater = false;
568                 remove();
569             }
570         }
571     }
572 
573     private static IoUringIoHandle cast(IoHandle handle) {
574         if (handle instanceof IoUringIoHandle) {
575             return (IoUringIoHandle) handle;
576         }
577         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
578     }
579 
580     @Override
581     public void wakeup() {
582         if (!executor.isExecutorThread(Thread.currentThread()) &&
583                 !eventfdAsyncNotify.getAndSet(true)) {
584             // write to the eventfd which will then trigger an eventfd read completion.
585             Native.eventFdWrite(eventfd.intValue(), 1L);
586         }
587     }
588 
589     @Override
590     public boolean isCompatible(Class<? extends IoHandle> handleType) {
591         return IoUringIoHandle.class.isAssignableFrom(handleType);
592     }
593 
594     IovArray iovArray() {
595         if (iovArray.isFull()) {
596             // Submit so we can reuse the iovArray.
597             submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
598         }
599         assert iovArray.count() == 0;
600         return iovArray;
601     }
602 
603     /**
604      * {@code byte[]} that can be used as temporary storage to encode the ipv4 address
605      */
606     byte[] inet4AddressArray() {
607         return inet4AddressArray;
608     }
609 
610     /**
611      * {@code byte[]} that can be used as temporary storage to encode the ipv6 address
612      */
613     byte[] inet6AddressArray() {
614         return inet6AddressArray;
615     }
616 
617     /**
618      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
619      *
620      * @return factory
621      */
622     public static IoHandlerFactory newFactory() {
623         return newFactory(new IoUringIoHandlerConfig());
624     }
625 
626     /**
627      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
628      * Each {@link IoUringIoHandler} will use a ring of size {@code ringSize}.
629      *
630      * @param  ringSize     the size of the ring.
631      * @return              factory
632      */
633     public static IoHandlerFactory newFactory(int ringSize) {
634         IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
635         configuration.setRingSize(ringSize);
636         return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
637     }
638 
639     /**
640      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
641      * Each {@link IoUringIoHandler} will use same option
642      * @param config the io_uring configuration
643      * @return factory
644      */
645     public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
646         IoUring.ensureAvailability();
647         ObjectUtil.checkNotNull(config, "config");
648         return eventLoop -> new IoUringIoHandler(eventLoop, config);
649     }
650 }