View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.channel.IoEventLoop;
19  import io.netty.channel.IoExecutionContext;
20  import io.netty.channel.IoHandle;
21  import io.netty.channel.IoHandler;
22  import io.netty.channel.IoHandlerFactory;
23  import io.netty.channel.IoOps;
24  import io.netty.channel.IoRegistration;
25  import io.netty.channel.unix.Errors;
26  import io.netty.channel.unix.FileDescriptor;
27  import io.netty.util.collection.IntObjectHashMap;
28  import io.netty.util.collection.IntObjectMap;
29  import io.netty.util.concurrent.Future;
30  import io.netty.util.concurrent.Promise;
31  import io.netty.util.internal.ObjectUtil;
32  import io.netty.util.internal.PlatformDependent;
33  import io.netty.util.internal.StringUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.io.IOException;
38  import java.io.UncheckedIOException;
39  import java.util.ArrayList;
40  import java.util.List;
41  import java.util.concurrent.RejectedExecutionException;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  
45  import static java.util.Objects.requireNonNull;
46  
47  /**
48   * {@link IoHandler} which is implemented in terms of the Linux-specific {@code io_uring} API.
49   */
50  public final class IoUringIoHandler implements IoHandler {
51  
52      // Special IoUringIoOps that will cause a submission and running of all completions.
53      static final IoUringIoOps SUBMIT_AND_RUN_ALL = new IoUringIoOps(
54              (byte) -1, (byte) -1, (short) -1, -1, -1, -1, -1, -1, (short) -1, (short) -1, (short) -1, -1, -1);
55  
56      private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
57      private static final short RING_CLOSE = 1;
58  
59      private final RingBuffer ringBuffer;
60      private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
61      // The maximum number of bytes for an InetAddress / Inet6Address
62      private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
63      private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
64  
65      private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
66      private final FileDescriptor eventfd;
67      private final long eventfdReadBuf;
68  
69      private long eventfdReadSubmitted;
70      private boolean eventFdClosing;
71      private volatile boolean shuttingDown;
72      private boolean closeCompleted;
73      private int nextRegistrationId = Integer.MIN_VALUE;
74      private int processedPerRun;
75  
76      // these two ids are used internally any so can't be used by nextRegistrationId().
77      private static final int EVENTFD_ID = Integer.MAX_VALUE;
78      private static final int RINGFD_ID = EVENTFD_ID - 1;
79      private static final int INVALID_ID = 0;
80  
81      private final CompletionBuffer completionBuffer;
82  
83      IoUringIoHandler(IoUringIoHandlerConfiguration config) {
84          // Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
85          IoUring.ensureAvailability();
86          requireNonNull(config, "config");
87          this.ringBuffer = Native.createRingBuffer(config.getRingSize());
88          if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
89              int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
90              int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
91              int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
92              if (result < 0) {
93                  // Close ringBuffer before throwing to ensure we release all memory on failure.
94                  ringBuffer.close();
95                  throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
96              }
97          }
98          registrations = new IntObjectHashMap<>();
99          eventfd = Native.newBlockingEventFd();
100         eventfdReadBuf = PlatformDependent.allocateMemory(8);
101 
102         // We buffer a maximum of 2 * CompletionQueue.ringSize completions before we drain them in batches.
103         // Also as we never submit an udata which is 0L we use this as the tombstone marker.
104         completionBuffer = new CompletionBuffer(ringBuffer.ioUringCompletionQueue().ringSize * 2, 0);
105     }
106 
107     @Override
108     public int run(IoExecutionContext context) {
109         processedPerRun = 0;
110         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
111         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
112         if (!completionQueue.hasCompletions() && context.canBlock()) {
113             if (eventfdReadSubmitted == 0) {
114                 submitEventFdRead();
115             }
116             if (context.deadlineNanos() != -1) {
117                 submitTimeout(context);
118             }
119             submissionQueue.submitAndWait();
120         } else {
121             submissionQueue.submit();
122         }
123         // we might call submitAndRunNow() while processing stuff in the completionArray we need to
124         // add the processed completions to processedPerRun as this might also be updated by submitAndRunNow()
125         processedPerRun += drainAndProcessAll(completionQueue, this::handle);
126 
127         // Let's submit one more time as the completions might have added things to the submission queue.
128         submissionQueue.submit();
129 
130         return processedPerRun;
131     }
132 
133     private void submitAndRunNow() {
134         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
135         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
136         if (submissionQueue.submit() > 0) {
137             processedPerRun += drainAndProcessAll(completionQueue, this::handle);
138         }
139     }
140 
141     private int drainAndProcessAll(CompletionQueue completionQueue, CompletionCallback callback) {
142         int processed = 0;
143         for (;;) {
144             boolean drainedAll = completionBuffer.drain(completionQueue);
145             processed += completionBuffer.processNow(callback);
146             if (drainedAll) {
147                 break;
148             }
149         }
150         return processed;
151     }
152 
153     private static void handleLoopException(Throwable throwable) {
154         logger.warn("Unexpected exception in the IO event loop.", throwable);
155 
156         // Prevent possible consecutive immediate failures that lead to
157         // excessive CPU consumption.
158         try {
159             Thread.sleep(100);
160         } catch (InterruptedException ignore) {
161             // ignore
162         }
163     }
164 
165     private boolean handle(int res, int flags, long udata) {
166         try {
167             int id = UserData.decodeId(udata);
168             byte op = UserData.decodeOp(udata);
169             short data = UserData.decodeData(udata);
170 
171             if (logger.isTraceEnabled()) {
172                 logger.trace("completed(ring {}): {}(id={}, res={})",
173                         ringBuffer.fd(), Native.opToStr(op), data, res);
174             }
175             if (id == EVENTFD_ID) {
176                 handleEventFdRead();
177                 return true;
178             }
179             if (id == RINGFD_ID) {
180                 if (op == Native.IORING_OP_NOP && data == RING_CLOSE) {
181                     completeRingClose();
182                 }
183                 return true;
184             }
185             DefaultIoUringIoRegistration registration = registrations.get(id);
186             if (registration == null) {
187                 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
188                         Native.opToStr(op), id, res);
189                 return true;
190             }
191             registration.handle(res, flags, op, data);
192             return true;
193         } catch (Error e) {
194             throw e;
195         } catch (Throwable throwable) {
196             handleLoopException(throwable);
197             return true;
198         }
199     }
200 
201     private void handleEventFdRead() {
202         eventfdReadSubmitted = 0;
203         if (!eventFdClosing) {
204             eventfdAsyncNotify.set(false);
205             submitEventFdRead();
206         }
207     }
208 
209     private void submitEventFdRead() {
210         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
211         long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
212 
213         eventfdReadSubmitted = submissionQueue.addEventFdRead(
214                 eventfd.intValue(), eventfdReadBuf, 0, 8, udata);
215     }
216 
217     private void submitTimeout(IoExecutionContext context) {
218         long delayNanos = context.delayNanos(System.nanoTime());
219         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_TIMEOUT, (short) 0);
220 
221         ringBuffer.ioUringSubmissionQueue().addTimeout(ringBuffer.fd(), delayNanos, udata);
222     }
223 
224     @Override
225     public void prepareToDestroy() {
226         shuttingDown = true;
227         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
228         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
229 
230         List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
231 
232         for (DefaultIoUringIoRegistration registration: copy) {
233             registration.close();
234         }
235 
236         // Ensure all previously submitted IOs get to complete before tearing down everything.
237         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
238         submissionQueue.addNop(ringBuffer.fd(), (byte) Native.IOSQE_IO_DRAIN, udata);
239         submissionQueue.submit();
240         while (completionQueue.hasCompletions()) {
241             completionQueue.process(this::handle);
242 
243             if (submissionQueue.count() > 0) {
244                 submissionQueue.submit();
245             }
246         }
247     }
248 
249     @Override
250     public void destroy() {
251         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
252         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
253         drainEventFd();
254         if (submissionQueue.remaining() < 2) {
255             // We need to submit 2 linked operations. Since they are linked, we cannot allow a submit-call to
256             // separate them. We don't have enough room (< 2) in the queue, so we submit now to make more room.
257             submissionQueue.submit();
258         }
259         // Try to drain all the IO from the queue first...
260         long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
261         submissionQueue.addNop(ringBuffer.fd(), (byte) Native.IOSQE_IO_DRAIN, udata);
262         // ... but only wait for 200 milliseconds on this
263         udata = UserData.encode(RINGFD_ID, Native.IORING_OP_LINK_TIMEOUT, (short) 0);
264         submissionQueue.addLinkTimeout(ringBuffer.fd(), TimeUnit.MILLISECONDS.toNanos(200), udata);
265         submissionQueue.submitAndWait();
266         completionQueue.process(this::handle);
267         completeRingClose();
268     }
269 
270     // We need to prevent the race condition where a wakeup event is submitted to a file descriptor that has
271     // already been freed (and potentially reallocated by the OS). Because submitted events is gated on the
272     // `eventfdAsyncNotify` flag we can close the gate but may need to read any outstanding events that have
273     // (or will) be written.
274     private void drainEventFd() {
275         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
276         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
277         assert !eventFdClosing;
278         eventFdClosing = true;
279         boolean eventPending = eventfdAsyncNotify.getAndSet(true);
280         if (eventPending) {
281             // There is an event that has been or will be written by another thread, so we must wait for the event.
282             // Make sure we're actually listening for writes to the event fd.
283             while (eventfdReadSubmitted == 0) {
284                 submitEventFdRead();
285                 submissionQueue.submit();
286             }
287             // Drain the eventfd of the pending wakup.
288             class DrainFdEventCallback implements CompletionCallback {
289                 boolean eventFdDrained;
290 
291                 @Override
292                 public boolean handle(int res, int flags, long udata) {
293                     if (UserData.decodeId(udata) == EVENTFD_ID) {
294                         eventFdDrained = true;
295                     }
296                     return IoUringIoHandler.this.handle(res, flags, udata);
297                 }
298             }
299             final DrainFdEventCallback handler = new DrainFdEventCallback();
300             drainAndProcessAll(completionQueue, handler);
301             completionQueue.process(handler);
302             while (!handler.eventFdDrained) {
303                 submissionQueue.submitAndWait();
304                 drainAndProcessAll(completionQueue, handler);
305             }
306         }
307         // We've consumed any pending eventfd read and `eventfdAsyncNotify` should never
308         // transition back to false, thus we should never have any more events written.
309         // So, if we have a read event pending, we can cancel it.
310         if (eventfdReadSubmitted != 0) {
311             long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
312             submissionQueue.addCancel(eventfd.intValue(), eventfdReadSubmitted, udata);
313             eventfdReadSubmitted = 0;
314             submissionQueue.submit();
315         }
316     }
317 
318     private void completeRingClose() {
319         if (closeCompleted) {
320             // already done.
321             return;
322         }
323         closeCompleted = true;
324         ringBuffer.close();
325         try {
326             eventfd.close();
327         } catch (IOException e) {
328             logger.warn("Failed to close eventfd", e);
329         }
330         PlatformDependent.freeMemory(eventfdReadBuf);
331     }
332 
333     @Override
334     public IoRegistration register(IoEventLoop eventLoop, IoHandle handle) throws Exception {
335         IoUringIoHandle ioHandle = cast(handle);
336         if (shuttingDown) {
337             throw new RejectedExecutionException("IoEventLoop is shutting down");
338         }
339         DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(eventLoop, ioHandle);
340         for (;;) {
341             int id = nextRegistrationId();
342             DefaultIoUringIoRegistration old = registrations.put(id, registration);
343             if (old != null) {
344                 assert old.handle != registration.handle;
345                 registrations.put(id, old);
346             } else {
347                 registration.setId(id);
348                 break;
349             }
350         }
351 
352         return registration;
353     }
354 
355     private int nextRegistrationId() {
356         int id;
357         do {
358             id = nextRegistrationId++;
359         } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
360         return id;
361     }
362 
363     private final class DefaultIoUringIoRegistration implements IoUringIoRegistration {
364         private final Promise<?> cancellationPromise;
365         private final IoEventLoop eventLoop;
366         private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
367         final IoUringIoHandle handle;
368 
369         private boolean removeLater;
370         private int outstandingCompletions;
371         private int id;
372 
373         DefaultIoUringIoRegistration(IoEventLoop eventLoop, IoUringIoHandle handle) {
374             this.eventLoop = eventLoop;
375             this.handle = handle;
376             this.cancellationPromise = eventLoop.newPromise();
377         }
378 
379         void setId(int id) {
380             this.id = id;
381         }
382 
383         @Override
384         public long submit(IoOps ops) {
385             IoUringIoOps ioOps = (IoUringIoOps) ops;
386             if (!isValid()) {
387                 return INVALID_ID;
388             }
389             long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
390             if (eventLoop.inEventLoop()) {
391                 submit0(ioOps, udata);
392             } else {
393                 eventLoop.execute(() -> submit0(ioOps, udata));
394             }
395             return udata;
396         }
397 
398         private void submit0(IoUringIoOps ioOps, long udata) {
399             if (ioOps == SUBMIT_AND_RUN_ALL) {
400                 submitAndRunNow();
401             } else {
402                 ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
403                         ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
404                         ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
405                 );
406                 outstandingCompletions++;
407             }
408         }
409 
410         @Override
411         public IoUringIoHandler ioHandler() {
412             return IoUringIoHandler.this;
413         }
414 
415         @Override
416         public void cancel() {
417             if (!cancellationPromise.trySuccess(null)) {
418                 // Already cancelled.
419                 return;
420             }
421             if (eventLoop.inEventLoop()) {
422                 tryRemove();
423             } else {
424                 eventLoop.execute(this::tryRemove);
425             }
426         }
427 
428         @Override
429         public Future<?> cancelFuture() {
430             return cancellationPromise;
431         }
432 
433         private void tryRemove() {
434             if (outstandingCompletions > 0) {
435                 // We have some completions outstanding, we will remove the id <-> registration mapping
436                 // once these are done.
437                 removeLater = true;
438                 return;
439             }
440             remove();
441         }
442 
443         private void remove() {
444             DefaultIoUringIoRegistration old = registrations.remove(id);
445             assert old == this;
446         }
447 
448         void close() {
449             // Closing the handle will also cancel the registration.
450             // It's important that we not manually cancel as close() might need to submit some work to the ring.
451             assert eventLoop.inEventLoop();
452             try {
453                 handle.close();
454             } catch (Exception e) {
455                 logger.debug("Exception during closing " + handle, e);
456             }
457         }
458 
459         void handle(int res, int flags, byte op, short data) {
460             event.update(res, flags, op, data);
461             handle.handle(this, event);
462             if (--outstandingCompletions == 0 && removeLater) {
463                 // No more outstanding completions, remove the fd <-> registration mapping now.
464                 removeLater = false;
465                 remove();
466             }
467         }
468     }
469 
470     private static IoUringIoHandle cast(IoHandle handle) {
471         if (handle instanceof IoUringIoHandle) {
472             return (IoUringIoHandle) handle;
473         }
474         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
475     }
476 
477     @Override
478     public void wakeup(IoEventLoop eventLoop) {
479         if (!eventLoop.inEventLoop() && !eventfdAsyncNotify.getAndSet(true)) {
480             // write to the eventfd which will then trigger an eventfd read completion.
481             Native.eventFdWrite(eventfd.intValue(), 1L);
482         }
483     }
484 
485     @Override
486     public boolean isCompatible(Class<? extends IoHandle> handleType) {
487         return IoUringIoHandle.class.isAssignableFrom(handleType);
488     }
489 
490     /**
491      * {@code byte[]} that can be used as temporary storage to encode the ipv4 address
492      */
493     byte[] inet4AddressArray() {
494         return inet4AddressArray;
495     }
496 
497     /**
498      * {@code byte[]} that can be used as temporary storage to encode the ipv6 address
499      */
500     byte[] inet6AddressArray() {
501         return inet6AddressArray;
502     }
503 
504     /**
505      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
506      *
507      * @return factory
508      */
509     public static IoHandlerFactory newFactory() {
510         return newFactory(new IoUringIoHandlerConfiguration());
511     }
512 
513     /**
514      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
515      * Each {@link IoUringIoHandler} will use a ring of size {@code ringSize}.
516      *
517      * @param  ringSize     the size of the ring.
518      * @return              factory
519      */
520     public static IoHandlerFactory newFactory(int ringSize) {
521         IoUringIoHandlerConfiguration configuration = new IoUringIoHandlerConfiguration();
522         configuration.setRingSize(ringSize);
523         return () -> new IoUringIoHandler(configuration);
524     }
525 
526     /**
527      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
528      * Each {@link IoUringIoHandler} will use same option
529      * @param config the io_uring configuration
530      * @return factory
531      */
532     public static IoHandlerFactory newFactory(IoUringIoHandlerConfiguration config) {
533         IoUring.ensureAvailability();
534         ObjectUtil.checkNotNull(config, "config");
535         return () -> new IoUringIoHandler(config);
536     }
537 
538 }