View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.channel.IoHandle;
19  import io.netty.channel.IoHandler;
20  import io.netty.channel.IoHandlerContext;
21  import io.netty.channel.IoHandlerFactory;
22  import io.netty.channel.IoOps;
23  import io.netty.channel.IoRegistration;
24  import io.netty.channel.unix.Buffer;
25  import io.netty.channel.unix.Errors;
26  import io.netty.channel.unix.FileDescriptor;
27  import io.netty.channel.unix.IovArray;
28  import io.netty.util.collection.IntObjectHashMap;
29  import io.netty.util.collection.IntObjectMap;
30  import io.netty.util.concurrent.ThreadAwareExecutor;
31  import io.netty.util.internal.CleanableDirectBuffer;
32  import io.netty.util.internal.ObjectUtil;
33  import io.netty.util.internal.StringUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.io.IOException;
38  import java.io.UncheckedIOException;
39  import java.nio.ByteBuffer;
40  import java.util.ArrayList;
41  import java.util.Collection;
42  import java.util.List;
43  import java.util.concurrent.TimeUnit;
44  import java.util.concurrent.atomic.AtomicBoolean;
45  import java.util.concurrent.atomic.AtomicInteger;
46  
47  import static java.lang.Math.max;
48  import static java.lang.Math.min;
49  import static java.util.Objects.requireNonNull;
50  
51  /**
52   * {@link IoHandler} which is implemented in terms of the Linux-specific {@code io_uring} API.
53   */
54  public final class IoUringIoHandler implements IoHandler {
55      private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
56      private static final int WAKEUP_CLOSED = 1 << 30;
57  
58      private final RingBuffer ringBuffer;
59      private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
60      private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
61      // The maximum number of bytes for an InetAddress / Inet6Address
62      private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
63      private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
64  
65      private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
66      private final AtomicInteger wakeupWriters = new AtomicInteger();
67      private final FileDescriptor eventfd;
68      private final CleanableDirectBuffer eventfdReadBufCleanable;
69      private final ByteBuffer eventfdReadBuf;
70      private final long eventfdReadBufAddress;
71      private final CleanableDirectBuffer timeoutMemoryCleanable;
72      private final ByteBuffer timeoutMemory;
73      private final long timeoutMemoryAddress;
74      private final IovArray iovArray;
75      private final MsgHdrMemoryArray msgHdrMemoryArray;
76      private long eventfdReadSubmitted;
77      private boolean eventFdClosing;
78      private volatile boolean shuttingDown;
79      private boolean closeCompleted;
80      private final PendingOpMap pendingOps;
81      private int nextRegistrationId = 1;
82  
83      private static final long INVALID_ID = 0;
84      private static final long EVENTFD_TOKEN = PendingOpMap.token(1);
85      private static final long RINGFD_TOKEN = PendingOpMap.token(2);
86      private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
87  
88      private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
89      private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
90  
91      private final ThreadAwareExecutor executor;
92  
93      IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
94          // Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
95          IoUring.ensureAvailability();
96          this.executor = requireNonNull(executor, "executor");
97          requireNonNull(config, "config");
98          int setupFlags = Native.setupFlags(config.singleIssuer());
99  
100         //The default cq size is always twice the ringSize.
101         // It only makes sense when the user actually specifies the cq ring size.
102         int cqSize = 2 * config.getRingSize();
103         if (config.needSetupCqeSize()) {
104             assert IoUring.isSetupCqeSizeSupported();
105             setupFlags |= Native.IORING_SETUP_CQSIZE;
106             cqSize = config.getCqSize();
107         }
108         this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
109         if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
110             int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
111             int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
112             int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
113             if (result < 0) {
114                 // Close ringBuffer before throwing to ensure we release all memory on failure.
115                 ringBuffer.close();
116                 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
117             }
118         }
119 
120         registeredIoUringBufferRing = new IntObjectHashMap<>();
121         Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
122         if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
123             for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
124                 try {
125                     IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
126                     registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
127                 } catch (Errors.NativeIoException e) {
128                     for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
129                         bufferRing.close();
130                     }
131                     // Close ringBuffer before throwing to ensure we release all memory on failure.
132                     ringBuffer.close();
133                     throw new UncheckedIOException(e);
134                 }
135             }
136         }
137 
138         registrations = new IntObjectHashMap<>();
139         pendingOps = new PendingOpMap(IoUring.DEFAULT_PENDING_OPS_INITIAL_CAPACITY);
140         eventfd = Native.newBlockingEventFd();
141         eventfdReadBufCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
142         eventfdReadBuf = eventfdReadBufCleanable.buffer();
143         eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
144         timeoutMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(KERNEL_TIMESPEC_SIZE);
145         timeoutMemory = timeoutMemoryCleanable.buffer();
146         timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
147         iovArray = new IovArray(IoUring.NUM_ELEMENTS_IOVEC);
148         msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1024);
149     }
150 
151     @Override
152     public void initialize() {
153         ringBuffer.enable();
154         // Fill all buffer rings now.
155         for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
156             bufferRing.initialize();
157         }
158     }
159 
160     @Override
161     public int run(IoHandlerContext context) {
162         if (closeCompleted) {
163             if (context.shouldReportActiveIoTime()) {
164                 context.reportActiveIoTime(0);
165             }
166             return 0;
167         }
168         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
169         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
170         if (!completionQueue.hasCompletions() && context.canBlock()) {
171             if (eventfdReadSubmitted == 0) {
172                 submitEventFdRead();
173             }
174             long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
175             submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
176         } else {
177             // Even if we have some completions already pending we can still try to even fetch more.
178             submitAndClearNow(submissionQueue);
179         }
180 
181         int processed;
182         if (context.shouldReportActiveIoTime()) {
183             // Timer starts after the blocking wait, around the processing of completions.
184             long activeIoStartTimeNanos = System.nanoTime();
185             processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
186             long activeIoEndTimeNanos = System.nanoTime();
187             context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
188         } else {
189             processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
190         }
191         return processed;
192     }
193 
194     private boolean needSubmit(int sqFlags) {
195         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
196         return submissionQueue.count() > 0
197                 || (sqFlags & (Native.IORING_SQ_CQ_OVERFLOW | Native.IORING_SQ_TASKRUN)) != 0;
198     }
199 
200     private int processCompletionsAndHandleOverflow(SubmissionQueue submissionQueue, CompletionQueue completionQueue,
201                                          CompletionCallback callback) {
202         int processed = 0;
203         // Bound the maximum number of times this will loop before we return and so execute some non IO stuff.
204         // 128 here is just some sort of bound and another number might be ok as well.
205         for (int i = 0; i < 128; i++) {
206             int p = completionQueue.process(callback);
207             int sqFlags = submissionQueue.flags();
208             if ((sqFlags & Native.IORING_SQ_CQ_OVERFLOW) != 0) {
209                 logger.warn("CompletionQueue overflow detected, consider increasing size: {} ",
210                         completionQueue.ringEntries);
211             }
212             if (p == 0) {
213                 if (!needSubmit(sqFlags)) {
214                     break;
215                 }
216                 submitAndClearNow0(submissionQueue);
217             }
218             processed += p;
219         }
220         return processed;
221     }
222 
223     private int submitAndClearNow(SubmissionQueue submissionQueue) {
224         if (needSubmit(submissionQueue.flags())) {
225             return submitAndClearNow0(submissionQueue);
226         }
227         return 0;
228     }
229 
230     private int submitAndClearNow0(SubmissionQueue submissionQueue) {
231 
232         int submitted = submissionQueue.submitAndGetNow();
233 
234         // Clear the iovArray as we can re-use it now as things are considered stable after submission:
235         // See https://man7.org/linux/man-pages/man3/io_uring_prep_sendmsg.3.html
236         iovArray.clear();
237         msgHdrMemoryArray.clear();
238         return submitted;
239     }
240 
241     private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
242             throws Errors.NativeIoException {
243         short bufferRingSize = bufferRingConfig.bufferRingSize();
244         short bufferGroupId = bufferRingConfig.bufferGroupId();
245         int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
246         long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
247         if (ioUringBufRingAddr < 0) {
248             throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
249         }
250         return new IoUringBufferRing(ringFd,
251                 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
252                 bufferRingSize, bufferRingConfig.batchSize(),
253                 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator(),
254                 bufferRingConfig.isBatchAllocation()
255         );
256     }
257 
258     IoUringBufferRing findBufferRing(short bgId) {
259         IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
260         if (cached != null) {
261             return cached;
262         }
263         throw new IllegalArgumentException(
264                 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
265         );
266     }
267 
268     private static void handleLoopException(Throwable throwable) {
269         logger.warn("Unexpected exception in the IO event loop.", throwable);
270 
271         // Prevent possible consecutive immediate failures that lead to
272         // excessive CPU consumption.
273         try {
274             Thread.sleep(100);
275         } catch (InterruptedException ignore) {
276             // ignore
277         }
278     }
279 
280     private void handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
281         try {
282             if (udata == EVENTFD_TOKEN) {
283                 handleEventFdRead();
284                 return;
285             }
286             if (udata == RINGFD_TOKEN) {
287                 return;
288             }
289             if (udata >= 0) {
290                 handleFastPath(res, flags, udata, extraCqeData);
291                 return;
292             }
293             handleSlowPath(res, flags, udata, extraCqeData);
294         } catch (Error e) {
295             throw e;
296         } catch (Throwable throwable) {
297             handleLoopException(throwable);
298         }
299     }
300 
301     private void handleFastPath(int res, int flags, long udata, ByteBuffer extraCqeData) {
302         int id = UserData.decodeId(udata);
303         byte op = UserData.decodeOp(udata);
304         long userData = UserData.decodeData(udata);
305         DefaultIoUringIoRegistration registration = registrations.get(id);
306         if (registration != null) {
307             traceCompletion(registration, id, op, res);
308             registration.handle(res, flags, op, userData, extraCqeData);
309             return;
310         }
311         if (logger.isDebugEnabled()) {
312             logger.debug("ignoring packed completion for unknown registration (registrationId={}, op={}, userData={},"
313                             + " res={})",
314                     id, Native.opToStr(op), userData, res);
315         }
316     }
317 
318     private void handleSlowPath(int res, int flags, long udata, ByteBuffer extraCqeData) {
319         long sequence = PendingOpMap.tokenSequence(udata);
320         int slot = pendingOps.findSlot(udata);
321         if (slot != -1) {
322             int registrationId = pendingOps.registrationId(slot);
323             DefaultIoUringIoRegistration registration = registrations.get(registrationId);
324             byte op = pendingOps.op(slot);
325             long userData = pendingOps.userData(slot);
326 
327             // Recycle if this completion is terminal (no more CQEs expected for this SQE).
328             if ((flags & Native.IORING_CQE_F_MORE) == 0) {
329                 pendingOps.release(slot);
330             }
331 
332             // Resolve slow-path completions through the live registration table to align with the fast path.
333             if (registration != null) {
334                 traceCompletion(registration, registrationId, op, res);
335                 registration.handle(res, flags, op, userData, extraCqeData);
336                 return;
337             }
338             if (logger.isDebugEnabled()) {
339                 logger.debug("ignoring slow-path completion for missing registration (registrationId={}, seq={}, "
340                                 + "op={}, userData={}, res={})",
341                         registrationId, sequence, Native.opToStr(op), userData, res);
342             }
343             return;
344         }
345         if (logger.isDebugEnabled()) {
346             logger.debug("ignoring slow-path completion for unknown sequence (seq={}, res={})", sequence, res);
347         }
348     }
349 
350     private void traceCompletion(DefaultIoUringIoRegistration registration, int registrationId, byte op, int res) {
351         if (!logger.isTraceEnabled()) {
352             return;
353         }
354         int fd = registration.fd();
355         if (fd != -1) {
356             logger.trace("completed(ring {}): {}(fd={}, res={})",
357                     ringBuffer.fd(), Native.opToStr(op), fd, res);
358         } else {
359             logger.trace("completed(ring {}): {}(registrationId={}, res={})",
360                     ringBuffer.fd(), Native.opToStr(op), registrationId, res);
361         }
362     }
363 
364     private void handleEventFdRead() {
365         eventfdReadSubmitted = 0;
366         if (!eventFdClosing) {
367             eventfdAsyncNotify.set(false);
368             submitEventFdRead();
369         }
370     }
371 
372     private void submitEventFdRead() {
373         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
374         eventfdReadSubmitted = submissionQueue.addEventFdRead(
375                 eventfd.intValue(), eventfdReadBufAddress, 0, 8, EVENTFD_TOKEN);
376     }
377 
378     private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
379                                          boolean linkTimeout, long timeoutNanoSeconds) {
380         if (timeoutNanoSeconds != -1) {
381             // We use the same timespec pointer for all add*Timeout operations. This only works because we call
382             // submit directly after it. This ensures the submitted timeout is considered "stable" and so can be reused.
383             long seconds, nanoSeconds;
384             if (timeoutNanoSeconds == 0) {
385                 seconds = 0;
386                 nanoSeconds = 0;
387             } else {
388                 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
389                 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
390             }
391 
392             timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
393             timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
394             if (linkTimeout) {
395                 submissionQueue.addLinkTimeout(timeoutMemoryAddress, RINGFD_TOKEN);
396             } else {
397                 submissionQueue.addTimeout(timeoutMemoryAddress, RINGFD_TOKEN);
398             }
399         }
400         int submitted = submissionQueue.submitAndGet();
401         // Clear the iovArray as we can re-use it now as things are considered stable after submission:
402         // See https://man7.org/linux/man-pages/man3/io_uring_prep_sendmsg.3.html
403         iovArray.clear();
404         msgHdrMemoryArray.clear();
405         return submitted;
406     }
407 
408     @Override
409     public void prepareToDestroy() {
410         shuttingDown = true;
411         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
412         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
413 
414         List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
415 
416         for (DefaultIoUringIoRegistration registration: copy) {
417             registration.close();
418         }
419 
420         // Write to the eventfd to ensure that if we submitted a read for the eventfd we will see the completion event.
421         Native.eventFdWrite(eventfd.intValue(), 1L);
422 
423         // Ensure all previously submitted IOs get to complete before tearing down everything.
424         submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, RINGFD_TOKEN);
425 
426         // Submit everything and wait until we could drain i.
427         submissionQueue.submitAndGet();
428 
429         while (completionQueue.hasCompletions()) {
430             processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
431             if (submissionQueue.count() > 0) {
432                 submissionQueue.submitAndGetNow();
433             }
434         }
435     }
436 
437     @Override
438     public void destroy() {
439         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
440         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
441         drainEventFd();
442         if (submissionQueue.remaining() < 2) {
443             // We need to submit 2 linked operations. Since they are linked, we cannot allow a submit-call to
444             // separate them. We don't have enough room (< 2) in the queue, so we submit now to make more room.
445             submissionQueue.submit();
446         }
447         // Try to drain all the IO from the queue first...
448         // We need to also specify the Native.IOSQE_LINK flag for it to work as otherwise it is not correctly linked
449         // with the timeout.
450         // See:
451         // - https://man7.org/linux/man-pages/man2/io_uring_enter.2.html
452         // - https://git.kernel.dk/cgit/liburing/commit/?h=link-timeout&id=bc1bd5e97e2c758d6fd975bd35843b9b2c770c5a
453         submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), RINGFD_TOKEN);
454         // ... but only wait for 200 milliseconds on this
455         submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
456         completionQueue.process(this::handle);
457         for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
458             ioUringBufferRing.close();
459         }
460         completeRingClose();
461     }
462 
463     // We need to prevent the race condition where a wakeup event is submitted to a file descriptor that has
464     // already been freed (and potentially reallocated by the OS). Because submitted events is gated on the
465     // `eventfdAsyncNotify` flag we can close the gate but may need to read any outstanding events that have
466     // (or will) be written.
467     private void drainEventFd() {
468         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
469         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
470         assert !eventFdClosing;
471         eventFdClosing = true;
472         boolean eventPending = eventfdAsyncNotify.getAndSet(true);
473         if (eventPending) {
474             // There is an event that has been or will be written by another thread, so we must wait for the event.
475             // Make sure we're actually listening for writes to the event fd.
476             while (eventfdReadSubmitted == 0) {
477                 submitEventFdRead();
478                 submissionQueue.submit();
479             }
480             // Drain the eventfd of the pending wakup.
481             class DrainFdEventCallback implements CompletionCallback {
482                 boolean eventFdDrained;
483 
484                 @Override
485                 public void handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
486                     if (udata == EVENTFD_TOKEN) {
487                         eventFdDrained = true;
488                     }
489                     IoUringIoHandler.this.handle(res, flags, udata, extraCqeData);
490                 }
491             }
492             final DrainFdEventCallback handler = new DrainFdEventCallback();
493             completionQueue.process(handler);
494             while (!handler.eventFdDrained) {
495                 submissionQueue.submitAndGet();
496                 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, handler);
497             }
498         }
499         // We've consumed any pending eventfd read and `eventfdAsyncNotify` should never
500         // transition back to false, thus we should never have any more events written.
501         // So, if we have a read event pending, we can cancel it.
502         if (eventfdReadSubmitted != 0) {
503             submissionQueue.addCancel(eventfdReadSubmitted, EVENTFD_TOKEN);
504             eventfdReadSubmitted = 0;
505             submissionQueue.submit();
506         }
507     }
508 
509     private void completeRingClose() {
510         if (closeCompleted) {
511             // already done.
512             return;
513         }
514         closeCompleted = true;
515         ringBuffer.close();
516         closeWakeupGate();
517         try {
518             eventfd.close();
519         } catch (IOException e) {
520             logger.warn("Failed to close eventfd", e);
521         }
522         eventfdReadBufCleanable.clean();
523         timeoutMemoryCleanable.clean();
524         iovArray.release();
525         msgHdrMemoryArray.release();
526     }
527 
528     @Override
529     public IoRegistration register(IoHandle handle) throws Exception {
530         IoUringIoHandle ioHandle = cast(handle);
531         if (shuttingDown) {
532             throw new IllegalStateException("IoUringIoHandler is shutting down");
533         }
534         int startId = nextRegistrationId;
535         DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
536         for (;;) {
537             int id = nextRegistrationId();
538             DefaultIoUringIoRegistration old = registrations.put(id, registration);
539             if (old != null) {
540                 assert old.handle != registration.handle;
541                 registrations.put(id, old);
542                 if (nextRegistrationId == startId) {
543                     throw new IllegalStateException("registration id space exhausted");
544                 }
545             } else {
546                 registration.setId(id);
547                 ioHandle.registered();
548                 break;
549             }
550         }
551 
552         return registration;
553     }
554 
555     private int nextRegistrationId() {
556         //registrationId must stay positive because id > 0
557         //it is used to distinguish normal fast-path completions from non-registration tokens.
558         int id = nextRegistrationId;
559         nextRegistrationId = id == Integer.MAX_VALUE ? 1 : id + 1;
560         return id;
561     }
562 
563     private final class DefaultIoUringIoRegistration implements IoRegistration {
564         private final AtomicBoolean canceled = new AtomicBoolean();
565         private final ThreadAwareExecutor executor;
566         private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, 0L);
567         final IoUringIoHandle handle;
568 
569         private boolean removeLater;
570         private int outstandingCompletions;
571         private int id;
572 
573         DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
574             this.executor = executor;
575             this.handle = handle;
576         }
577 
578         void setId(int id) {
579             this.id = id;
580         }
581 
582         @Override
583         public long submit(IoOps ops) {
584             IoUringIoOps ioOps = (IoUringIoOps) ops;
585             if (!isValid()) {
586                 return INVALID_ID;
587             }
588             if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
589                 // Because we expect at least 1 completion per submission we can't support IOSQE_CQE_SKIP_SUCCESS
590                 // as it will only produce a completion on failure.
591                 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
592             }
593             long userData = ioOps.userData();
594             // Use the fast path when the full submission can still be encoded into packed UserData.
595             if (canUseFastPath(userData)) {
596                 long packedSeq = UserData.encode(id, ioOps.opcode(), (short) userData);
597                 if (executor.isExecutorThread(Thread.currentThread())) {
598                     submitFastPath0(ioOps, packedSeq);
599                 } else {
600                     executor.execute(() -> submitFastPath0(ioOps, packedSeq));
601                 }
602                 return packedSeq;
603             }
604             long token = pendingOps.nextToken();
605             if (executor.isExecutorThread(Thread.currentThread())) {
606                 submitSlowPath0(ioOps, token, userData);
607             } else {
608                 executor.execute(() -> submitSlowPath0(ioOps, token, userData));
609             }
610             return token;
611         }
612 
613         private void submitFastPath0(IoUringIoOps ioOps, long seq) {
614             ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
615                     ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), seq,
616                     ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
617             );
618             outstandingCompletions++;
619         }
620 
621         private void submitSlowPath0(IoUringIoOps ioOps, long token, long userData) {
622             pendingOps.registerNormal(token, id, ioOps.opcode(), userData);
623             ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
624                     ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), token,
625                     ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
626             );
627             outstandingCompletions++;
628         }
629 
630         private boolean canUseFastPath(long userData) {
631             return ((short) userData) == userData;
632         }
633 
634         private int fd() {
635             if (handle instanceof AbstractIoUringChannel) {
636                 return ((AbstractIoUringChannel) handle).fd().intValue();
637             }
638             return -1;
639         }
640 
641         @SuppressWarnings("unchecked")
642         @Override
643         public <T> T attachment() {
644             return (T) IoUringIoHandler.this;
645         }
646 
647         @Override
648         public boolean isValid() {
649             return !canceled.get();
650         }
651 
652         @Override
653         public boolean cancel() {
654             if (!canceled.compareAndSet(false, true)) {
655                 // Already cancelled.
656                 return false;
657             }
658             if (executor.isExecutorThread(Thread.currentThread())) {
659                 tryRemove();
660             } else {
661                 executor.execute(this::tryRemove);
662             }
663             return true;
664         }
665 
666         private void tryRemove() {
667             if (outstandingCompletions > 0) {
668                 // We have some completions outstanding, we will remove the id <-> registration mapping
669                 // once these are done.
670                 removeLater = true;
671                 return;
672             }
673             remove();
674         }
675 
676         private void remove() {
677             DefaultIoUringIoRegistration old = registrations.remove(id);
678             assert old == this;
679             handle.unregistered();
680         }
681 
682         void close() {
683             // Closing the handle will also cancel the registration.
684             // It's important that we not manually cancel as close() might need to submit some work to the ring.
685             assert executor.isExecutorThread(Thread.currentThread());
686             try {
687                 handle.close();
688             } catch (Exception e) {
689                 logger.debug("Exception during closing " + handle, e);
690             }
691         }
692 
693         void handle(int res, int flags, byte op, long userData, ByteBuffer extraCqeData) {
694             event.update(res, flags, op, userData, extraCqeData);
695             handle.handle(this, event);
696             // Only decrement outstandingCompletions if IORING_CQE_F_MORE is not set as otherwise we know that we will
697             // receive more completions for the intial request.
698             if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
699                 // No more outstanding completions, remove the registration now.
700                 removeLater = false;
701                 remove();
702             }
703         }
704     }
705 
706     private static IoUringIoHandle cast(IoHandle handle) {
707         if (handle instanceof IoUringIoHandle) {
708             return (IoUringIoHandle) handle;
709         }
710         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
711     }
712 
713     @Override
714     public void wakeup() {
715         if (!executor.isExecutorThread(Thread.currentThread()) &&
716             !eventfdAsyncNotify.getAndSet(true)) {
717             // Reserve a writer slot so the event-loop thread cannot close the eventfd while we are in the
718             // middle of eventFdWrite(). If the gate has already been closed (loop is being destroyed),
719             // simply drop the wakeup: there is no loop left to wake up, and writing to a closed (and
720             // possibly recycled) fd would either throw EBADF or, worse, hit an unrelated fd.
721             int s;
722             do {
723                 s = wakeupWriters.get();
724                 if ((s & WAKEUP_CLOSED) != 0) {
725                     return;
726                 }
727             } while (!wakeupWriters.compareAndSet(s, s + 1));
728             try {
729                 // write to the eventfd which will then trigger an eventfd read completion.
730                 Native.eventFdWrite(eventfd.intValue(), 1L);
731             } finally {
732                 wakeupWriters.decrementAndGet();
733             }
734         }
735     }
736 
737     private void closeWakeupGate() {
738         int s;
739         do {
740             s = wakeupWriters.get();
741         } while (!wakeupWriters.compareAndSet(s, s | WAKEUP_CLOSED));
742         // Wait for any thread still inside eventFdWrite() to leave. eventFdWrite is a single write(2)
743         // syscall on an eventfd, so this spin is bounded to a few microseconds in practice.
744         while ((wakeupWriters.get() & ~WAKEUP_CLOSED) != 0) {
745             Thread.onSpinWait();
746         }
747     }
748 
749     @Override
750     public boolean isCompatible(Class<? extends IoHandle> handleType) {
751         return IoUringIoHandle.class.isAssignableFrom(handleType);
752     }
753 
754     IovArray iovArray() {
755         if (iovArray.isFull()) {
756             // Submit so we can reuse the iovArray.
757             submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
758             assert iovArray.count() == 0;
759         }
760         return iovArray;
761     }
762 
763     MsgHdrMemoryArray msgHdrMemoryArray() {
764         if (msgHdrMemoryArray.isFull()) {
765             // Submit so we can reuse the msgHdrArray.
766             submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
767         }
768         return msgHdrMemoryArray;
769     }
770 
771     /**
772      * {@code byte[]} that can be used as temporary storage to encode the ipv4 address
773      */
774     byte[] inet4AddressArray() {
775         return inet4AddressArray;
776     }
777 
778     /**
779      * {@code byte[]} that can be used as temporary storage to encode the ipv6 address
780      */
781     byte[] inet6AddressArray() {
782         return inet6AddressArray;
783     }
784 
785     /**
786      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
787      *
788      * @return factory
789      */
790     public static IoHandlerFactory newFactory() {
791         return newFactory(new IoUringIoHandlerConfig());
792     }
793 
794     /**
795      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
796      * Each {@link IoUringIoHandler} will use a ring of size {@code ringSize}.
797      *
798      * @param  ringSize     the size of the ring.
799      * @return              factory
800      */
801     public static IoHandlerFactory newFactory(int ringSize) {
802         IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
803         configuration.setRingSize(ringSize);
804         return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
805     }
806 
807     /**
808      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
809      * Each {@link IoUringIoHandler} will use same option
810      * @param config the io_uring configuration
811      * @return factory
812      */
813     public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
814         IoUring.ensureAvailability();
815         final IoUringIoHandlerConfig copy = ObjectUtil.checkNotNull(config, "config").verifyAndClone();
816         return new IoHandlerFactory() {
817             @Override
818             public IoHandler newHandler(ThreadAwareExecutor eventLoop) {
819                 return new IoUringIoHandler(eventLoop, copy);
820             }
821 
822             @Override
823             public boolean isChangingThreadSupported() {
824                 return !copy.singleIssuer();
825             }
826         };
827     }
828 }