View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.channel.IoHandlerContext;
19  import io.netty.channel.IoHandle;
20  import io.netty.channel.IoHandler;
21  import io.netty.channel.IoHandlerFactory;
22  import io.netty.channel.IoOps;
23  import io.netty.channel.IoRegistration;
24  import io.netty.channel.unix.Errors;
25  import io.netty.channel.unix.FileDescriptor;
26  import io.netty.util.collection.IntObjectHashMap;
27  import io.netty.util.collection.IntObjectMap;
28  import io.netty.util.concurrent.ThreadAwareExecutor;
29  import io.netty.util.internal.ObjectUtil;
30  import io.netty.util.internal.PlatformDependent;
31  import io.netty.util.internal.StringUtil;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.io.IOException;
36  import java.io.UncheckedIOException;
37  import java.util.ArrayList;
38  import java.util.Collection;
39  import java.util.List;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  
43  import static java.lang.Math.max;
44  import static java.lang.Math.min;
45  import static java.util.Objects.requireNonNull;
46  
47  /**
48   * {@link IoHandler} which is implemented in terms of the Linux-specific {@code io_uring} API.
49   */
50  public final class IoUringIoHandler implements IoHandler {
51      private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
52      private static final short RING_CLOSE = 1;
53  
54      private final RingBuffer ringBuffer;
55      private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
56      private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
57      // The maximum number of bytes for an InetAddress / Inet6Address
58      private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
59      private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
60  
61      private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
62      private final FileDescriptor eventfd;
63      private final long eventfdReadBuf;
64      private final long timeoutMemoryAddress;
65  
66      private long eventfdReadSubmitted;
67      private boolean eventFdClosing;
68      private volatile boolean shuttingDown;
69      private boolean closeCompleted;
70      private int nextRegistrationId = Integer.MIN_VALUE;
71  
72      // these two ids are used internally any so can't be used by nextRegistrationId().
73      private static final int EVENTFD_ID = Integer.MAX_VALUE;
74      private static final int RINGFD_ID = EVENTFD_ID - 1;
75      private static final int INVALID_ID = 0;
76  
77      private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
78  
79      private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
80      private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
81  
82      private final CompletionBuffer completionBuffer;
83      private final ThreadAwareExecutor executor;
84  
85      IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
86          // Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
87          IoUring.ensureAvailability();
88          this.executor = requireNonNull(executor, "executor");
89          requireNonNull(config, "config");
90          int setupFlags = Native.setupFlags();
91  
92          //The default cq size is always twice the ringSize.
93          // It only makes sense when the user actually specifies the cq ring size.
94          int cqSize = 2 * config.getRingSize();
95          if (config.needSetupCqeSize()) {
96              if (!IoUring.isSetupCqeSizeSupported()) {
97                  throw new UnsupportedOperationException("IORING_SETUP_CQSIZE is not supported");
98              }
99              setupFlags |= Native.IORING_SETUP_CQSIZE;
100             cqSize = config.checkCqSize(config.getCqSize());
101         }
102         this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
103         if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
104             int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
105             int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
106             int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
107             if (result < 0) {
108                 // Close ringBuffer before throwing to ensure we release all memory on failure.
109                 ringBuffer.close();
110                 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
111             }
112         }
113 
114         registeredIoUringBufferRing = new IntObjectHashMap<>();
115         Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
116         if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
117             if (!IoUring.isRegisterBufferRingSupported()) {
118                 // Close ringBuffer before throwing to ensure we release all memory on failure.
119                 ringBuffer.close();
120                 throw new UnsupportedOperationException("IORING_REGISTER_PBUF_RING is not supported");
121             }
122             for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
123                 try {
124                     IoUringBufferRing ring = newBufferRing(bufferRingConfig);
125                     registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
126                 } catch (Errors.NativeIoException e) {
127                     for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
128                         bufferRing.close();
129                     }
130                     // Close ringBuffer before throwing to ensure we release all memory on failure.
131                     ringBuffer.close();
132                     throw new UncheckedIOException(e);
133                 }
134             }
135         }
136 
137         registrations = new IntObjectHashMap<>();
138         eventfd = Native.newBlockingEventFd();
139         eventfdReadBuf = PlatformDependent.allocateMemory(8);
140         this.timeoutMemoryAddress = PlatformDependent.allocateMemory(KERNEL_TIMESPEC_SIZE);
141 
142         // We buffer a maximum of 2 * CompletionQueue.ringCapacity completions before we drain them in batches.
143         // Also as we never submit an udata which is 0L we use this as the tombstone marker.
144         completionBuffer = new CompletionBuffer(ringBuffer.ioUringCompletionQueue().ringCapacity * 2, 0);
145     }
146 
147     @Override
148     public void initialize() {
149         ringBuffer.enable();
150         // Fill all buffer rings now.
151         for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
152             bufferRing.fill();
153         }
154     }
155 
156     @Override
157     public int run(IoHandlerContext context) {
158         int processedPerRun = 0;
159         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
160         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
161         if (!completionQueue.hasCompletions() && context.canBlock()) {
162             if (eventfdReadSubmitted == 0) {
163                 submitEventFdRead();
164             }
165             long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
166             submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
167         } else {
168             submissionQueue.submit();
169         }
170         for (;;) {
171             // we might call submitAndRunNow() while processing stuff in the completionArray we need to
172             // add the processed completions to processedPerRun.
173             int processed = drainAndProcessAll(completionQueue, this::handle);
174             processedPerRun += processed;
175 
176             // Let's submit again.
177             // If we were not able to submit anything and there was nothing left in the completionBuffer we will
178             // break out of the loop and return to the caller.
179             if (submissionQueue.submit() == 0 && processed == 0) {
180                 break;
181             }
182         }
183 
184         return processedPerRun;
185     }
186 
187     void submitAndRunNow(long udata) {
188         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
189         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
190         if (submissionQueue.submit() > 0) {
191             completionBuffer.drain(completionQueue);
192             completionBuffer.processOneNow(this::handle, udata);
193         }
194     }
195 
196     IoUringBufferRing newBufferRing(IoUringBufferRingConfig bufferRingConfig) throws Errors.NativeIoException {
197         int ringFd = ringBuffer.fd();
198         short bufferRingSize = bufferRingConfig.bufferRingSize();
199         short bufferGroupId = bufferRingConfig.bufferGroupId();
200         int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
201         long ioUringBufRingAddr = Native.ioUringRegisterBuffRing(ringFd, bufferRingSize, bufferGroupId, flags);
202         if (ioUringBufRingAddr < 0) {
203             throw Errors.newIOException("ioUringRegisterBuffRing", (int) ioUringBufRingAddr);
204         }
205         return new IoUringBufferRing(ringFd, ioUringBufRingAddr,
206                 bufferRingSize, bufferRingConfig.maxUnreleasedBuffers(),
207                 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator()
208         );
209     }
210 
211     IoUringBufferRing findBufferRing(short bgId) {
212         IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
213         if (cached != null) {
214             return cached;
215         }
216         throw new IllegalArgumentException(
217                 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
218         );
219     }
220 
221     private int drainAndProcessAll(CompletionQueue completionQueue, CompletionCallback callback) {
222         int processed = 0;
223         for (;;) {
224             boolean drainedAll = completionBuffer.drain(completionQueue);
225             processed += completionBuffer.processNow(callback);
226             if (drainedAll) {
227                 break;
228             }
229         }
230         return processed;
231     }
232 
233     private static void handleLoopException(Throwable throwable) {
234         logger.warn("Unexpected exception in the IO event loop.", throwable);
235 
236         // Prevent possible consecutive immediate failures that lead to
237         // excessive CPU consumption.
238         try {
239             Thread.sleep(100);
240         } catch (InterruptedException ignore) {
241             // ignore
242         }
243     }
244 
245     private boolean handle(int res, int flags, long udata) {
246         try {
247             int id = UserData.decodeId(udata);
248             byte op = UserData.decodeOp(udata);
249             short data = UserData.decodeData(udata);
250 
251             if (logger.isTraceEnabled()) {
252                 logger.trace("completed(ring {}): {}(id={}, res={})",
253                         ringBuffer.fd(), Native.opToStr(op), data, res);
254             }
255             if (id == EVENTFD_ID) {
256                 handleEventFdRead();
257                 return true;
258             }
259             if (id == RINGFD_ID) {
260                 if (op == Native.IORING_OP_NOP && data == RING_CLOSE) {
261                     completeRingClose();
262                 }
263                 return true;
264             }
265             DefaultIoUringIoRegistration registration = registrations.get(id);
266             if (registration == null) {
267                 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
268                         Native.opToStr(op), id, res);
269                 return true;
270             }
271             registration.handle(res, flags, op, data);
272             return true;
273         } catch (Error e) {
274             throw e;
275         } catch (Throwable throwable) {
276             handleLoopException(throwable);
277             return true;
278         }
279     }
280 
281     private void handleEventFdRead() {
282         eventfdReadSubmitted = 0;
283         if (!eventFdClosing) {
284             eventfdAsyncNotify.set(false);
285             submitEventFdRead();
286         }
287     }
288 
289     private void submitEventFdRead() {
290         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
291         long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
292 
293         eventfdReadSubmitted = submissionQueue.addEventFdRead(
294                 eventfd.intValue(), eventfdReadBuf, 0, 8, udata);
295     }
296 
297     private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
298                                          boolean linkTimeout, long timeoutNanoSeconds) {
299         if (timeoutNanoSeconds != -1) {
300             long udata = UserData.encode(RINGFD_ID,
301                     linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
302             // We use the same timespec pointer for all add*Timeout operations. This only works because we call
303             // submit directly after it. This ensures the submitted timeout is considered "stable" and so can be reused.
304             long seconds, nanoSeconds;
305             if (timeoutNanoSeconds == 0) {
306                 seconds = 0;
307                 nanoSeconds = 0;
308             } else {
309                 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
310                 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
311             }
312 
313             PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
314             PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
315 
316             if (linkTimeout) {
317                 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
318             } else {
319                 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
320             }
321         }
322         return submissionQueue.submitAndWait();
323     }
324 
325     @Override
326     public void prepareToDestroy() {
327         shuttingDown = true;
328         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
329         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
330 
331         List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
332 
333         for (DefaultIoUringIoRegistration registration: copy) {
334             registration.close();
335         }
336 
337         // Write to the eventfd to ensure that if we submitted a read for the eventfd we will see the completion event.
338         Native.eventFdWrite(eventfd.intValue(), 1L);
339 
340         // Ensure all previously submitted IOs get to complete before tearing down everything.
341         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
342         submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
343 
344         // Submit everything and wait until we could drain i.
345         submissionQueue.submitAndWait();
346         while (completionQueue.hasCompletions()) {
347             completionQueue.process(this::handle);
348 
349             if (submissionQueue.count() > 0) {
350                 submissionQueue.submit();
351             }
352         }
353     }
354 
355     @Override
356     public void destroy() {
357         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
358         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
359         drainEventFd();
360         if (submissionQueue.remaining() < 2) {
361             // We need to submit 2 linked operations. Since they are linked, we cannot allow a submit-call to
362             // separate them. We don't have enough room (< 2) in the queue, so we submit now to make more room.
363             submissionQueue.submit();
364         }
365         // Try to drain all the IO from the queue first...
366         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
367         // We need to also specify the Native.IOSQE_LINK flag for it to work as otherwise it is not correctly linked
368         // with the timeout.
369         // See:
370         // - https://man7.org/linux/man-pages/man2/io_uring_enter.2.html
371         // - https://git.kernel.dk/cgit/liburing/commit/?h=link-timeout&id=bc1bd5e97e2c758d6fd975bd35843b9b2c770c5a
372         submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
373         // ... but only wait for 200 milliseconds on this
374         submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
375         completionQueue.process(this::handle);
376         for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
377             ioUringBufferRing.close();
378         }
379         completeRingClose();
380     }
381 
382     // We need to prevent the race condition where a wakeup event is submitted to a file descriptor that has
383     // already been freed (and potentially reallocated by the OS). Because submitted events is gated on the
384     // `eventfdAsyncNotify` flag we can close the gate but may need to read any outstanding events that have
385     // (or will) be written.
386     private void drainEventFd() {
387         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
388         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
389         assert !eventFdClosing;
390         eventFdClosing = true;
391         boolean eventPending = eventfdAsyncNotify.getAndSet(true);
392         if (eventPending) {
393             // There is an event that has been or will be written by another thread, so we must wait for the event.
394             // Make sure we're actually listening for writes to the event fd.
395             while (eventfdReadSubmitted == 0) {
396                 submitEventFdRead();
397                 submissionQueue.submit();
398             }
399             // Drain the eventfd of the pending wakup.
400             class DrainFdEventCallback implements CompletionCallback {
401                 boolean eventFdDrained;
402 
403                 @Override
404                 public boolean handle(int res, int flags, long udata) {
405                     if (UserData.decodeId(udata) == EVENTFD_ID) {
406                         eventFdDrained = true;
407                     }
408                     return IoUringIoHandler.this.handle(res, flags, udata);
409                 }
410             }
411             final DrainFdEventCallback handler = new DrainFdEventCallback();
412             drainAndProcessAll(completionQueue, handler);
413             completionQueue.process(handler);
414             while (!handler.eventFdDrained) {
415                 submissionQueue.submitAndWait();
416                 drainAndProcessAll(completionQueue, handler);
417             }
418         }
419         // We've consumed any pending eventfd read and `eventfdAsyncNotify` should never
420         // transition back to false, thus we should never have any more events written.
421         // So, if we have a read event pending, we can cancel it.
422         if (eventfdReadSubmitted != 0) {
423             long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
424             submissionQueue.addCancel(eventfdReadSubmitted, udata);
425             eventfdReadSubmitted = 0;
426             submissionQueue.submit();
427         }
428     }
429 
430     private void completeRingClose() {
431         if (closeCompleted) {
432             // already done.
433             return;
434         }
435         closeCompleted = true;
436         ringBuffer.close();
437         try {
438             eventfd.close();
439         } catch (IOException e) {
440             logger.warn("Failed to close eventfd", e);
441         }
442         PlatformDependent.freeMemory(eventfdReadBuf);
443         PlatformDependent.freeMemory(timeoutMemoryAddress);
444     }
445 
446     @Override
447     public IoRegistration register(IoHandle handle) throws Exception {
448         IoUringIoHandle ioHandle = cast(handle);
449         if (shuttingDown) {
450             throw new IllegalStateException("IoUringIoHandler is shutting down");
451         }
452         DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
453         for (;;) {
454             int id = nextRegistrationId();
455             DefaultIoUringIoRegistration old = registrations.put(id, registration);
456             if (old != null) {
457                 assert old.handle != registration.handle;
458                 registrations.put(id, old);
459             } else {
460                 registration.setId(id);
461                 break;
462             }
463         }
464 
465         return registration;
466     }
467 
468     private int nextRegistrationId() {
469         int id;
470         do {
471             id = nextRegistrationId++;
472         } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
473         return id;
474     }
475 
476     private final class DefaultIoUringIoRegistration implements IoRegistration {
477         private final AtomicBoolean canceled = new AtomicBoolean();
478         private final ThreadAwareExecutor executor;
479         private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
480         final IoUringIoHandle handle;
481 
482         private boolean removeLater;
483         private int outstandingCompletions;
484         private int id;
485 
486         DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
487             this.executor = executor;
488             this.handle = handle;
489         }
490 
491         void setId(int id) {
492             this.id = id;
493         }
494 
495         @Override
496         public long submit(IoOps ops) {
497             IoUringIoOps ioOps = (IoUringIoOps) ops;
498             if (!isValid()) {
499                 return INVALID_ID;
500             }
501             long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
502             if (executor.isExecutorThread(Thread.currentThread())) {
503                 submit0(ioOps, udata);
504             } else {
505                 executor.execute(() -> submit0(ioOps, udata));
506             }
507             return udata;
508         }
509 
510         private void submit0(IoUringIoOps ioOps, long udata) {
511             ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
512                     ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
513                     ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
514             );
515             outstandingCompletions++;
516         }
517 
518         @SuppressWarnings("unchecked")
519         @Override
520         public <T> T attachment() {
521             return (T) IoUringIoHandler.this;
522         }
523 
524         @Override
525         public boolean isValid() {
526             return !canceled.get();
527         }
528 
529         @Override
530         public boolean cancel() {
531             if (!canceled.compareAndSet(false, true)) {
532                 // Already cancelled.
533                 return false;
534             }
535             if (executor.isExecutorThread(Thread.currentThread())) {
536                 tryRemove();
537             } else {
538                 executor.execute(this::tryRemove);
539             }
540             return true;
541         }
542 
543         private void tryRemove() {
544             if (outstandingCompletions > 0) {
545                 // We have some completions outstanding, we will remove the id <-> registration mapping
546                 // once these are done.
547                 removeLater = true;
548                 return;
549             }
550             remove();
551         }
552 
553         private void remove() {
554             DefaultIoUringIoRegistration old = registrations.remove(id);
555             assert old == this;
556         }
557 
558         void close() {
559             // Closing the handle will also cancel the registration.
560             // It's important that we not manually cancel as close() might need to submit some work to the ring.
561             assert executor.isExecutorThread(Thread.currentThread());
562             try {
563                 handle.close();
564             } catch (Exception e) {
565                 logger.debug("Exception during closing " + handle, e);
566             }
567         }
568 
569         void handle(int res, int flags, byte op, short data) {
570             event.update(res, flags, op, data);
571             handle.handle(this, event);
572             if (--outstandingCompletions == 0 && removeLater) {
573                 // No more outstanding completions, remove the fd <-> registration mapping now.
574                 removeLater = false;
575                 remove();
576             }
577         }
578     }
579 
580     private static IoUringIoHandle cast(IoHandle handle) {
581         if (handle instanceof IoUringIoHandle) {
582             return (IoUringIoHandle) handle;
583         }
584         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
585     }
586 
587     @Override
588     public void wakeup() {
589         if (!executor.isExecutorThread(Thread.currentThread()) &&
590                 !eventfdAsyncNotify.getAndSet(true)) {
591             // write to the eventfd which will then trigger an eventfd read completion.
592             Native.eventFdWrite(eventfd.intValue(), 1L);
593         }
594     }
595 
596     @Override
597     public boolean isCompatible(Class<? extends IoHandle> handleType) {
598         return IoUringIoHandle.class.isAssignableFrom(handleType);
599     }
600 
601     /**
602      * {@code byte[]} that can be used as temporary storage to encode the ipv4 address
603      */
604     byte[] inet4AddressArray() {
605         return inet4AddressArray;
606     }
607 
608     /**
609      * {@code byte[]} that can be used as temporary storage to encode the ipv6 address
610      */
611     byte[] inet6AddressArray() {
612         return inet6AddressArray;
613     }
614 
615     /**
616      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
617      *
618      * @return factory
619      */
620     public static IoHandlerFactory newFactory() {
621         return newFactory(new IoUringIoHandlerConfig());
622     }
623 
624     /**
625      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
626      * Each {@link IoUringIoHandler} will use a ring of size {@code ringSize}.
627      *
628      * @param  ringSize     the size of the ring.
629      * @return              factory
630      */
631     public static IoHandlerFactory newFactory(int ringSize) {
632         IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
633         configuration.setRingSize(ringSize);
634         return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
635     }
636 
637     /**
638      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
639      * Each {@link IoUringIoHandler} will use same option
640      * @param config the io_uring configuration
641      * @return factory
642      */
643     public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
644         IoUring.ensureAvailability();
645         ObjectUtil.checkNotNull(config, "config");
646         return eventLoop -> new IoUringIoHandler(eventLoop, config);
647     }
648 }