View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.channel.IoEventLoop;
19  import io.netty.channel.IoExecutionContext;
20  import io.netty.channel.IoHandle;
21  import io.netty.channel.IoHandler;
22  import io.netty.channel.IoHandlerFactory;
23  import io.netty.channel.IoOps;
24  import io.netty.channel.IoRegistration;
25  import io.netty.channel.unix.FileDescriptor;
26  import io.netty.util.collection.IntObjectHashMap;
27  import io.netty.util.collection.IntObjectMap;
28  import io.netty.util.concurrent.Future;
29  import io.netty.util.concurrent.Promise;
30  import io.netty.util.internal.ObjectUtil;
31  import io.netty.util.internal.PlatformDependent;
32  import io.netty.util.internal.StringUtil;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import java.io.IOException;
37  import java.util.ArrayList;
38  import java.util.List;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  
43  import static java.util.Objects.requireNonNull;
44  
45  /**
46   * {@link IoHandler} which is implemented in terms of the Linux-specific {@code io_uring} API.
47   */
48  public final class IoUringIoHandler implements IoHandler {
49      private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
50      private static final short RING_CLOSE = 1;
51  
52      private final RingBuffer ringBuffer;
53      private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
54      // The maximum number of bytes for an InetAddress / Inet6Address
55      private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
56      private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
57  
58      private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
59      private final FileDescriptor eventfd;
60      private final long eventfdReadBuf;
61  
62      private long eventfdReadSubmitted;
63      private boolean eventFdClosing;
64      private volatile boolean shuttingDown;
65      private boolean closeCompleted;
66      private int nextRegistrationId = Integer.MIN_VALUE;
67  
68      // these two ids are used internally any so can't be used by nextRegistrationId().
69      private static final int EVENTFD_ID = Integer.MAX_VALUE;
70      private static final int RINGFD_ID = EVENTFD_ID - 1;
71      private static final int INVALID_ID = 0;
72  
73      IoUringIoHandler(RingBuffer ringBuffer) {
74          // Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
75          IoUring.ensureAvailability();
76          this.ringBuffer = requireNonNull(ringBuffer, "ringBuffer");
77          registrations = new IntObjectHashMap<>();
78          eventfd = Native.newBlockingEventFd();
79          eventfdReadBuf = PlatformDependent.allocateMemory(8);
80      }
81  
82      @Override
83      public int run(IoExecutionContext context) {
84          SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
85          CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
86          if (!completionQueue.hasCompletions() && context.canBlock()) {
87              if (eventfdReadSubmitted == 0) {
88                  submitEventFdRead();
89              }
90              if (context.deadlineNanos() != -1) {
91                  submitTimeout(context);
92              }
93              submissionQueue.submitAndWait();
94          } else {
95              submissionQueue.submit();
96          }
97          return completionQueue.process(this::handle);
98      }
99  
100     private void handle(int res, int flags, int id, byte op, short data) {
101         if (id == EVENTFD_ID) {
102             handleEventFdRead();
103             return;
104         }
105         if (id == RINGFD_ID) {
106             if (op == Native.IORING_OP_NOP && data == RING_CLOSE) {
107                 completeRingClose();
108             }
109             return;
110         }
111         DefaultIoUringIoRegistration registration = registrations.get(id);
112         if (registration == null) {
113             logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
114                     Native.opToStr(op), id, res);
115             return;
116         }
117         registration.handle(res, flags, op, data);
118     }
119 
120     private void handleEventFdRead() {
121         eventfdReadSubmitted = 0;
122         if (!eventFdClosing) {
123             eventfdAsyncNotify.set(false);
124             submitEventFdRead();
125         }
126     }
127 
128     private void submitEventFdRead() {
129         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
130         eventfdReadSubmitted = submissionQueue.addEventFdRead(
131                 eventfd.intValue(), eventfdReadBuf, 0, 8, EVENTFD_ID, (short) 0);
132     }
133 
134     private void submitTimeout(IoExecutionContext context) {
135         long delayNanos = context.delayNanos(System.nanoTime());
136         ringBuffer.ioUringSubmissionQueue().addTimeout(
137                 ringBuffer.fd(), delayNanos, RINGFD_ID, (short) 0);
138     }
139 
140     @Override
141     public void prepareToDestroy() {
142         shuttingDown = true;
143         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
144         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
145 
146         List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
147 
148         for (DefaultIoUringIoRegistration registration: copy) {
149             registration.close();
150         }
151 
152         // Ensure all previously submitted IOs get to complete before tearing down everything.
153         submissionQueue.addNop(ringBuffer.fd(), (byte) Native.IOSQE_IO_DRAIN, RINGFD_ID, (short) 0);
154         submissionQueue.submit();
155         while (completionQueue.hasCompletions()) {
156             completionQueue.process(this::handle);
157 
158             if (submissionQueue.count() > 0) {
159                 submissionQueue.submit();
160             }
161         }
162     }
163 
164     @Override
165     public void destroy() {
166         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
167         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
168         drainEventFd();
169         if (submissionQueue.remaining() < 2) {
170             // We need to submit 2 linked operations. Since they are linked, we cannot allow a submit-call to
171             // separate them. We don't have enough room (< 2) in the queue, so we submit now to make more room.
172             submissionQueue.submit();
173         }
174         // Try to drain all the IO from the queue first...
175         submissionQueue.addNop(ringBuffer.fd(), (byte) Native.IOSQE_IO_DRAIN, RINGFD_ID, (short) 0);
176         // ... but only wait for 200 milliseconds on this
177         submissionQueue.addLinkTimeout(ringBuffer.fd(), TimeUnit.MILLISECONDS.toNanos(200), RINGFD_ID, (short) 0);
178         submissionQueue.submitAndWait();
179         completionQueue.process(this::handle);
180         completeRingClose();
181     }
182 
183     // We need to prevent the race condition where a wakeup event is submitted to a file descriptor that has
184     // already been freed (and potentially reallocated by the OS). Because submitted events is gated on the
185     // `eventfdAsyncNotify` flag we can close the gate but may need to read any outstanding events that have
186     // (or will) be written.
187     private void drainEventFd() {
188         CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
189         SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
190         assert !eventFdClosing;
191         eventFdClosing = true;
192         boolean eventPending = eventfdAsyncNotify.getAndSet(true);
193         if (eventPending) {
194             // There is an event that has been or will be written by another thread, so we must wait for the event.
195             // Make sure we're actually listening for writes to the event fd.
196             while (eventfdReadSubmitted == 0) {
197                 submitEventFdRead();
198                 submissionQueue.submit();
199             }
200             // Drain the eventfd of the pending wakup.
201             class DrainFdEventCallback implements CompletionCallback {
202                 boolean eventFdDrained;
203 
204                 @Override
205                 public void handle(int res, int flags, int id, byte op, short data) {
206                     if (id == EVENTFD_ID) {
207                         eventFdDrained = true;
208                     }
209                     IoUringIoHandler.this.handle(res, flags, id, op, data);
210                 }
211             }
212             final DrainFdEventCallback handler = new DrainFdEventCallback();
213             completionQueue.process(handler);
214             while (!handler.eventFdDrained) {
215                 submissionQueue.submitAndWait();
216                 completionQueue.process(handler);
217             }
218         }
219         // We've consumed any pending eventfd read and `eventfdAsyncNotify` should never
220         // transition back to false, thus we should never have any more events written.
221         // So, if we have a read event pending, we can cancel it.
222         if (eventfdReadSubmitted != 0) {
223             submissionQueue.addCancel(eventfd.intValue(), eventfdReadSubmitted, EVENTFD_ID);
224             eventfdReadSubmitted = 0;
225             submissionQueue.submit();
226         }
227     }
228 
229     private void completeRingClose() {
230         if (closeCompleted) {
231             // already done.
232             return;
233         }
234         closeCompleted = true;
235         ringBuffer.close();
236         try {
237             eventfd.close();
238         } catch (IOException e) {
239             logger.warn("Failed to close eventfd", e);
240         }
241         PlatformDependent.freeMemory(eventfdReadBuf);
242     }
243 
244     @Override
245     public IoRegistration register(IoEventLoop eventLoop, IoHandle handle) throws Exception {
246         IoUringIoHandle ioHandle = cast(handle);
247         if (shuttingDown) {
248             throw new RejectedExecutionException("IoEventLoop is shutting down");
249         }
250         DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(eventLoop, ioHandle);
251         for (;;) {
252             int id = nextRegistrationId();
253             DefaultIoUringIoRegistration old = registrations.put(id, registration);
254             if (old != null) {
255                 assert old.handle != registration.handle;
256                 registrations.put(id, old);
257             } else {
258                 registration.setId(id);
259                 break;
260             }
261         }
262 
263         ringBuffer.ioUringSubmissionQueue().incrementHandledFds();
264         return registration;
265     }
266 
267     private int nextRegistrationId() {
268         int id;
269         do {
270             id = nextRegistrationId++;
271         } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
272         return id;
273     }
274 
275     private final class DefaultIoUringIoRegistration implements IoUringIoRegistration {
276         private final Promise<?> cancellationPromise;
277         private final IoEventLoop eventLoop;
278         private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
279         final IoUringIoHandle handle;
280 
281         private boolean removeLater;
282         private int outstandingCompletions;
283         private int id;
284 
285         DefaultIoUringIoRegistration(IoEventLoop eventLoop, IoUringIoHandle handle) {
286             this.eventLoop = eventLoop;
287             this.handle = handle;
288             this.cancellationPromise = eventLoop.newPromise();
289         }
290 
291         void setId(int id) {
292             this.id = id;
293         }
294 
295         @Override
296         public long submit(IoOps ops) {
297             IoUringIoOps ioOps = (IoUringIoOps) ops;
298             if (!isValid()) {
299                 return INVALID_ID;
300             }
301             long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
302             if (eventLoop.inEventLoop()) {
303                 submit0(ioOps, udata);
304             } else {
305                 eventLoop.execute(() -> submit0(ioOps, udata));
306             }
307             return udata;
308         }
309 
310         private void submit0(IoUringIoOps ioOps, long udata) {
311             ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
312                     ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
313                     ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
314             );
315             outstandingCompletions++;
316         }
317 
318         @Override
319         public IoUringIoHandler ioHandler() {
320             return IoUringIoHandler.this;
321         }
322 
323         @Override
324         public void cancel() {
325             if (!cancellationPromise.trySuccess(null)) {
326                 // Already cancelled.
327                 return;
328             }
329             if (eventLoop.inEventLoop()) {
330                 tryRemove();
331             } else {
332                 eventLoop.execute(this::tryRemove);
333             }
334         }
335 
336         @Override
337         public Future<?> cancelFuture() {
338             return cancellationPromise;
339         }
340 
341         private void tryRemove() {
342             if (outstandingCompletions > 0) {
343                 // We have some completions outstanding, we will remove the id <-> registration mapping
344                 // once these are done.
345                 removeLater = true;
346                 return;
347             }
348             remove();
349         }
350 
351         private void remove() {
352             DefaultIoUringIoRegistration old = registrations.remove(id);
353             assert old == this;
354             ringBuffer.ioUringSubmissionQueue().decrementHandledFds();
355         }
356 
357         void close() {
358             // Closing the handle will also cancel the registration.
359             // It's important that we not manually cancel as close() might need to submit some work to the ring.
360             assert eventLoop.inEventLoop();
361             try {
362                 handle.close();
363             } catch (Exception e) {
364                 logger.debug("Exception during closing " + handle, e);
365             }
366         }
367 
368         void handle(int res, int flags, byte op, short data) {
369             event.update(res, flags, op, data);
370             handle.handle(this, event);
371             if (--outstandingCompletions == 0 && removeLater) {
372                 // No more outstanding completions, remove the fd <-> registration mapping now.
373                 removeLater = false;
374                 remove();
375             }
376         }
377     }
378 
379     private static IoUringIoHandle cast(IoHandle handle) {
380         if (handle instanceof IoUringIoHandle) {
381             return (IoUringIoHandle) handle;
382         }
383         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
384     }
385 
386     @Override
387     public void wakeup(IoEventLoop eventLoop) {
388         if (!eventLoop.inEventLoop() && !eventfdAsyncNotify.getAndSet(true)) {
389             // write to the eventfd which will then trigger an eventfd read completion.
390             Native.eventFdWrite(eventfd.intValue(), 1L);
391         }
392     }
393 
394     @Override
395     public boolean isCompatible(Class<? extends IoHandle> handleType) {
396         return IoUringIoHandle.class.isAssignableFrom(handleType);
397     }
398 
399     /**
400      * {@code byte[]} that can be used as temporary storage to encode the ipv4 address
401      */
402     byte[] inet4AddressArray() {
403         return inet4AddressArray;
404     }
405 
406     /**
407      * {@code byte[]} that can be used as temporary storage to encode the ipv6 address
408      */
409     byte[] inet6AddressArray() {
410         return inet6AddressArray;
411     }
412 
413     /**
414      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
415      *
416      * @return factory
417      */
418     public static IoHandlerFactory newFactory() {
419         IoUring.ensureAvailability();
420         return () -> new IoUringIoHandler(Native.createRingBuffer());
421     }
422 
423     /**
424      * Create a new {@link IoHandlerFactory} that can be used to create {@link IoUringIoHandler}s.
425      * Each {@link IoUringIoHandler} will use a ring of size {@code ringSize}.
426      *
427      * @param  ringSize     the size of the ring.
428      * @return              factory
429      */
430     public static IoHandlerFactory newFactory(int ringSize) {
431         IoUring.ensureAvailability();
432         ObjectUtil.checkPositive(ringSize, "ringSize");
433         return () -> new IoUringIoHandler(Native.createRingBuffer(ringSize));
434     }
435 }