View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.channel.IoEventLoop;
19  import io.netty.channel.IoExecutionContext;
20  import io.netty.channel.IoHandle;
21  import io.netty.channel.IoHandler;
22  import io.netty.channel.IoHandlerFactory;
23  import io.netty.channel.IoOps;
24  import io.netty.channel.IoRegistration;
25  import io.netty.channel.unix.FileDescriptor;
26  import io.netty.util.collection.IntObjectHashMap;
27  import io.netty.util.collection.IntObjectMap;
28  import io.netty.util.concurrent.Future;
29  import io.netty.util.concurrent.Promise;
30  import io.netty.util.internal.ObjectUtil;
31  import io.netty.util.internal.PlatformDependent;
32  import io.netty.util.internal.StringUtil;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import java.io.IOException;
37  import java.util.ArrayList;
38  import java.util.List;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  
43  import static java.util.Objects.requireNonNull;
44  
45  /**
46   * {@link IoHandler} which is implemented in terms of the Linux-specific {@code io_uring} API.
47   */
48  public final class IoUringIoHandler implements IoHandler {
49      private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
50      private static final short RING_CLOSE = 1;
51  
52      private final RingBuffer ringBuffer;
53      private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
54      // The maximum number of bytes for an InetAddress / Inet6Address
55      private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
56      private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
57  
58      private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
59      private final FileDescriptor eventfd;
60      private final long eventfdReadBuf;
61  
62      private long eventfdReadSubmitted;
63      private boolean eventFdClosing;
64      private volatile boolean shuttingDown;
65      private boolean closeCompleted;
66      private int nextRegistrationId = Integer.MIN_VALUE;
67  
68      // these two ids are used internally any so can't be used by nextRegistrationId().
69      private static final int EVENTFD_ID = Integer.MAX_VALUE;
70      private static final int RINGFD_ID = EVENTFD_ID - 1;
71  
72      IoUringIoHandler(RingBuffer ringBuffer) {
73          // Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
74          IoUring.ensureAvailability();
75          this.ringBuffer = requireNonNull(ringBuffer, "ringBuffer");
76          registrations = new IntObjectHashMap<>();
77          eventfd = Native.newBlockingEventFd();
78          eventfdReadBuf = PlatformDependent.allocateMemory(8);
79      }
80  
81      @Override
82      public int run(IoExecutionContext context) {
83          SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
84          CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
85          if (!completionQueue.hasCompletions() && context.canBlock()) {
86              if (eventfdReadSubmitted == 0) {
87                  submitEventFdRead();
88              }
89              if (context.deadlineNanos() != -1) {
90                  submitTimeout(context);
91              }
92              submissionQueue.submitAndWait();
93          } else {
94              submissionQueue.submit();
95          }
96          return completionQueue.process(this::handle);
97      }
98  
99      private void handle(int res, int flags, int id, byte op, short data) {
100         if (id == EVENTFD_ID) {
101             handleEventFdRead();
102             return;
103         }
104         if (id == RINGFD_ID) {
105             if (op == Native.IORING_OP_NOP && data == RING_CLOSE) {
106                 completeRingClose();
107             }
108             return;
109         }
110         DefaultIoUringIoRegistration registration = registrations.get(id);
111         if (registration == null) {
112             logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
113                     Native.opToStr(op), id, res);
114             return;
115         }
116         registration.handle(res, flags, op, data);
117     }
118 
119     private void handleEventFdRead() {
120         eventfdReadSubmitted = 0;
121         if (!eventFdClosing) {
122             eventfdAsyncNotify.set(false);
123             submitEventFdRead();
124         }
125     }
126 
127     private void submitEventFdRead() {
128         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
129         eventfdReadSubmitted = submissionQueue.addEventFdRead(
130                 eventfd.intValue(), eventfdReadBuf, 0, 8, EVENTFD_ID, (short) 0);
131     }
132 
133     private void submitTimeout(IoExecutionContext context) {
134         long delayNanos = context.delayNanos(System.nanoTime());
135         ringBuffer.ioUringSubmissionQueue().addTimeout(
136                 ringBuffer.fd(), delayNanos, RINGFD_ID, (short) 0);
137     }
138 
139     @Override
140     public void prepareToDestroy() {
141         shuttingDown = true;
142         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
143         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
144 
145         List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
146         // Ensure all previously submitted IOs get to complete before closing all fds.
147         submissionQueue.addNop(ringBuffer.fd(), Native.IOSQE_IO_DRAIN, RINGFD_ID, (short) 0);
148 
149         for (DefaultIoUringIoRegistration registration: copy) {
150             registration.close();
151             if (submissionQueue.count() > 0) {
152                 submissionQueue.submit();
153             }
154             if (completionQueue.hasCompletions()) {
155                 completionQueue.process(this::handle);
156             }
157         }
158     }
159 
160     @Override
161     public void destroy() {
162         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
163         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
164         drainEventFd();
165         if (submissionQueue.remaining() < 2) {
166             // We need to submit 2 linked operations. Since they are linked, we cannot allow a submit-call to
167             // separate them. We don't have enough room (< 2) in the queue, so we submit now to make more room.
168             submissionQueue.submit();
169         }
170         // Try to drain all the IO from the queue first...
171         submissionQueue.addNop(ringBuffer.fd(), Native.IOSQE_IO_DRAIN, RINGFD_ID, (short) 0);
172         // ... but only wait for 200 milliseconds on this
173         submissionQueue.addLinkTimeout(ringBuffer.fd(), TimeUnit.MILLISECONDS.toNanos(200), RINGFD_ID, (short) 0);
174         submissionQueue.submitAndWait();
175         completionQueue.process(this::handle);
176         completeRingClose();
177     }
178 
179     // We need to prevent the race condition where a wakeup event is submitted to a file descriptor that has
180     // already been freed (and potentially reallocated by the OS). Because submitted events is gated on the
181     // `eventfdAsyncNotify` flag we can close the gate but may need to read any outstanding events that have
182     // (or will) be written.
183     private void drainEventFd() {
184         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
185         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
186         assert !eventFdClosing;
187         eventFdClosing = true;
188         boolean eventPending = eventfdAsyncNotify.getAndSet(true);
189         if (eventPending) {
190             // There is an event that has been or will be written by another thread, so we must wait for the event.
191             // Make sure we're actually listening for writes to the event fd.
192             while (eventfdReadSubmitted == 0) {
193                 submitEventFdRead();
194                 submissionQueue.submit();
195             }
196             // Drain the eventfd of the pending wakup.
197             class DrainFdEventCallback implements CompletionCallback {
198                 boolean eventFdDrained;
199 
200                 @Override
201                 public void handle(int res, int flags, int id, byte op, short data) {
202                     if (id == EVENTFD_ID) {
203                         eventFdDrained = true;
204                     }
205                     IoUringIoHandler.this.handle(res, flags, id, op, data);
206                 }
207             }
208             final DrainFdEventCallback handler = new DrainFdEventCallback();
209             completionQueue.process(handler);
210             while (!handler.eventFdDrained) {
211                 submissionQueue.submitAndWait();
212                 completionQueue.process(handler);
213             }
214         }
215         // We've consumed any pending eventfd read and `eventfdAsyncNotify` should never
216         // transition back to false, thus we should never have any more events written.
217         // So, if we have a read event pending, we can cancel it.
218         if (eventfdReadSubmitted != 0) {
219             submissionQueue.addCancel(eventfd.intValue(), eventfdReadSubmitted, EVENTFD_ID);
220             eventfdReadSubmitted = 0;
221             submissionQueue.submit();
222         }
223     }
224 
225     private void completeRingClose() {
226         if (closeCompleted) {
227             // already done.
228             return;
229         }
230         closeCompleted = true;
231         ringBuffer.close();
232         try {
233             eventfd.close();
234         } catch (IOException e) {
235             logger.warn("Failed to close eventfd", e);
236         }
237         PlatformDependent.freeMemory(eventfdReadBuf);
238     }
239 
240     @Override
241     public IoRegistration register(IoEventLoop eventLoop, IoHandle handle) throws Exception {
242         IoUringIoHandle ioHandle = cast(handle);
243         if (shuttingDown) {
244             throw new RejectedExecutionException("IoEventLoop is shutting down");
245         }
246         DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(eventLoop, ioHandle);
247         for (;;) {
248             int id = nextRegistrationId();
249             DefaultIoUringIoRegistration old = registrations.put(id, registration);
250             if (old != null) {
251                 assert old.handle != registration.handle;
252                 registrations.put(id, old);
253             } else {
254                 registration.setId(id);
255                 break;
256             }
257         }
258 
259         ringBuffer.ioUringSubmissionQueue().incrementHandledFds();
260         return registration;
261     }
262 
263     private int nextRegistrationId() {
264         int id;
265         do {
266             id = nextRegistrationId++;
267         } while (id == RINGFD_ID || id == EVENTFD_ID);
268         return id;
269     }
270 
271     private final class DefaultIoUringIoRegistration implements IoUringIoRegistration {
272         private final Promise<?> cancellationPromise;
273         private final IoEventLoop eventLoop;
274         private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
275         final IoUringIoHandle handle;
276 
277         private boolean removeLater;
278         private int outstandingCompletions;
279         private int id;
280 
281         DefaultIoUringIoRegistration(IoEventLoop eventLoop, IoUringIoHandle handle) {
282             this.eventLoop = eventLoop;
283             this.handle = handle;
284             this.cancellationPromise = eventLoop.newPromise();
285         }
286 
287         void setId(int id) {
288             this.id = id;
289         }
290 
291         @Override
292         public long submit(IoOps ops) {
293             IoUringIoOps ioOps = (IoUringIoOps) ops;
294             long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
295             if (!isValid()) {
296                 return udata;
297             }
298             if (eventLoop.inEventLoop()) {
299                 submit0(ioOps, udata);
300             } else {
301                 eventLoop.execute(() -> submit0(ioOps, udata));
302             }
303             return udata;
304         }
305 
306         private void submit0(IoUringIoOps ioOps, long udata) {
307             ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
308                     ioOps.rwFlags(), ioOps.fd(), ioOps.bufferAddress(), ioOps.length(), ioOps.offset(), udata);
309             outstandingCompletions++;
310         }
311 
312         @Override
313         public IoUringIoHandler ioHandler() {
314             return IoUringIoHandler.this;
315         }
316 
317         @Override
318         public void cancel() {
319             if (cancellationPromise.trySuccess(null)) {
320                 return;
321             }
322             if (eventLoop.inEventLoop()) {
323                 tryRemove();
324             } else {
325                 eventLoop.execute(this::tryRemove);
326             }
327         }
328 
329         @Override
330         public Future<?> cancelFuture() {
331             return cancellationPromise;
332         }
333 
334         private void tryRemove() {
335             if (outstandingCompletions > 0) {
336                 // We have some completions outstanding, we will remove the id <-> registration mapping
337                 // once these are done.
338                 removeLater = true;
339                 return;
340             }
341             remove();
342         }
343 
344         private void remove() {
345             DefaultIoUringIoRegistration old = registrations.remove(id);
346             assert old == this;
347             ringBuffer.ioUringSubmissionQueue().decrementHandledFds();
348         }
349 
350         void close() {
351             assert eventLoop.inEventLoop();
352             try {
353                 cancel();
354             } catch (Exception e) {
355                 logger.debug("Exception during canceling " + this, e);
356             }
357             try {
358                 handle.close();
359             } catch (Exception e) {
360                 logger.debug("Exception during closing " + handle, e);
361             }
362         }
363 
364         void handle(int res, int flags, byte op, short data) {
365             event.update(res, flags, op, data);
366             handle.handle(this, event);
367             if (--outstandingCompletions == 0 && removeLater) {
368                 // No more outstanding completions, remove the fd <-> registration mapping now.
369                 removeLater = false;
370                 remove();
371             }
372         }
373     }
374 
375     private static IoUringIoHandle cast(IoHandle handle) {
376         if (handle instanceof IoUringIoHandle) {
377             return (IoUringIoHandle) handle;
378         }
379         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
380     }
381 
382     @Override
383     public void wakeup(IoEventLoop eventLoop) {
384         if (!eventLoop.inEventLoop() && !eventfdAsyncNotify.getAndSet(true)) {
385             // write to the eventfd which will then trigger an eventfd read completion.
386             Native.eventFdWrite(eventfd.intValue(), 1L);
387         }
388     }
389 
390     @Override
391     public boolean isCompatible(Class<? extends IoHandle> handleType) {
392         return IoUringIoHandle.class.isAssignableFrom(handleType);
393     }
394 
395     /**
396      * {@code byte[]} that can be used as temporary storage to encode the ipv4 address
397      */
398     byte[] inet4AddressArray() {
399         return inet4AddressArray;
400     }
401 
402     /**
403      * {@code byte[]} that can be used as temporary storage to encode the ipv6 address
404      */
405     byte[] inet6AddressArray() {
406         return inet6AddressArray;
407     }
408 
409     /**
410      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
411      *
412      * @return factory
413      */
414     public static IoHandlerFactory newFactory() {
415         IoUring.ensureAvailability();
416         return () -> new IoUringIoHandler(Native.createRingBuffer());
417     }
418 
419     /**
420      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
421      * Each {@link IoUringIoHandler} will use a ring of size {@code ringSize}.
422      *
423      * @param  ringSize     the size of the ring.
424      * @return              factory
425      */
426     public static IoHandlerFactory newFactory(int ringSize) {
427         IoUring.ensureAvailability();
428         ObjectUtil.checkPositive(ringSize, "ringSize");
429         return () -> new IoUringIoHandler(Native.createRingBuffer(ringSize));
430     }
431 }