1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.channel.IoHandle;
19 import io.netty.channel.IoHandler;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandlerFactory;
22 import io.netty.channel.IoOps;
23 import io.netty.channel.IoRegistration;
24 import io.netty.channel.unix.Buffer;
25 import io.netty.channel.unix.Errors;
26 import io.netty.channel.unix.FileDescriptor;
27 import io.netty.channel.unix.IovArray;
28 import io.netty.util.collection.IntObjectHashMap;
29 import io.netty.util.collection.IntObjectMap;
30 import io.netty.util.concurrent.ThreadAwareExecutor;
31 import io.netty.util.internal.CleanableDirectBuffer;
32 import io.netty.util.internal.ObjectUtil;
33 import io.netty.util.internal.StringUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.io.IOException;
38 import java.io.UncheckedIOException;
39 import java.nio.ByteBuffer;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.List;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45
46 import static java.lang.Math.max;
47 import static java.lang.Math.min;
48 import static java.util.Objects.requireNonNull;
49
50
51
52
53 public final class IoUringIoHandler implements IoHandler {
54 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
55
56 private final RingBuffer ringBuffer;
57 private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
58 private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
59
60 private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
61 private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
62
63 private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
64 private final FileDescriptor eventfd;
65 private final CleanableDirectBuffer eventfdReadBufCleanable;
66 private final ByteBuffer eventfdReadBuf;
67 private final long eventfdReadBufAddress;
68 private final CleanableDirectBuffer timeoutMemoryCleanable;
69 private final ByteBuffer timeoutMemory;
70 private final long timeoutMemoryAddress;
71 private final IovArray iovArray;
72 private final MsgHdrMemoryArray msgHdrMemoryArray;
73 private long eventfdReadSubmitted;
74 private boolean eventFdClosing;
75 private volatile boolean shuttingDown;
76 private boolean closeCompleted;
77 private int nextRegistrationId = Integer.MIN_VALUE;
78
79
80 private static final int EVENTFD_ID = Integer.MAX_VALUE;
81 private static final int RINGFD_ID = EVENTFD_ID - 1;
82 private static final int INVALID_ID = 0;
83
84 private static final int KERNEL_TIMESPEC_SIZE = 16;
85
86 private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
87 private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
88
89 private final ThreadAwareExecutor executor;
90
91 IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
92
93 IoUring.ensureAvailability();
94 this.executor = requireNonNull(executor, "executor");
95 requireNonNull(config, "config");
96 int setupFlags = Native.setupFlags(config.singleIssuer());
97
98
99
100 int cqSize = 2 * config.getRingSize();
101 if (config.needSetupCqeSize()) {
102 assert IoUring.isSetupCqeSizeSupported();
103 setupFlags |= Native.IORING_SETUP_CQSIZE;
104 cqSize = config.getCqSize();
105 }
106 this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
107 if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
108 int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
109 int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
110 int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
111 if (result < 0) {
112
113 ringBuffer.close();
114 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
115 }
116 }
117
118 registeredIoUringBufferRing = new IntObjectHashMap<>();
119 Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
120 if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
121 for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
122 try {
123 IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
124 registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
125 } catch (Errors.NativeIoException e) {
126 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
127 bufferRing.close();
128 }
129
130 ringBuffer.close();
131 throw new UncheckedIOException(e);
132 }
133 }
134 }
135
136 registrations = new IntObjectHashMap<>();
137 eventfd = Native.newBlockingEventFd();
138 eventfdReadBufCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
139 eventfdReadBuf = eventfdReadBufCleanable.buffer();
140 eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
141 timeoutMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(KERNEL_TIMESPEC_SIZE);
142 timeoutMemory = timeoutMemoryCleanable.buffer();
143 timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
144 iovArray = new IovArray(IoUring.NUM_ELEMENTS_IOVEC);
145 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1024);
146 }
147
148 @Override
149 public void initialize() {
150 ringBuffer.enable();
151
152 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
153 bufferRing.initialize();
154 }
155 }
156
157 @Override
158 public int run(IoHandlerContext context) {
159 if (closeCompleted) {
160 if (context.shouldReportActiveIoTime()) {
161 context.reportActiveIoTime(0);
162 }
163 return 0;
164 }
165 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
166 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
167 if (!completionQueue.hasCompletions() && context.canBlock()) {
168 if (eventfdReadSubmitted == 0) {
169 submitEventFdRead();
170 }
171 long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
172 submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
173 } else {
174
175 submitAndClearNow(submissionQueue);
176 }
177
178 int processed;
179 if (context.shouldReportActiveIoTime()) {
180
181 long activeIoStartTimeNanos = System.nanoTime();
182 processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
183 long activeIoEndTimeNanos = System.nanoTime();
184 context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
185 } else {
186 processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
187 }
188 return processed;
189 }
190
191 private int processCompletionsAndHandleOverflow(SubmissionQueue submissionQueue, CompletionQueue completionQueue,
192 CompletionCallback callback) {
193 int processed = 0;
194
195
196 for (int i = 0; i < 128; i++) {
197 int p = completionQueue.process(callback);
198 if ((submissionQueue.flags() & Native.IORING_SQ_CQ_OVERFLOW) != 0) {
199 logger.warn("CompletionQueue overflow detected, consider increasing size: {} ",
200 completionQueue.ringEntries);
201 submitAndClearNow(submissionQueue);
202 } else if (p == 0 &&
203
204 (submissionQueue.count() == 0 ||
205
206
207 (submitAndClearNow(submissionQueue) == 0 && !completionQueue.hasCompletions()))) {
208 break;
209 }
210 processed += p;
211 }
212 return processed;
213 }
214
215 private int submitAndClearNow(SubmissionQueue submissionQueue) {
216 int submitted = submissionQueue.submitAndGetNow();
217
218
219
220 iovArray.clear();
221 msgHdrMemoryArray.clear();
222 return submitted;
223 }
224
225 private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
226 throws Errors.NativeIoException {
227 short bufferRingSize = bufferRingConfig.bufferRingSize();
228 short bufferGroupId = bufferRingConfig.bufferGroupId();
229 int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
230 long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
231 if (ioUringBufRingAddr < 0) {
232 throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
233 }
234 return new IoUringBufferRing(ringFd,
235 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
236 bufferRingSize, bufferRingConfig.batchSize(),
237 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator(),
238 bufferRingConfig.isBatchAllocation()
239 );
240 }
241
242 IoUringBufferRing findBufferRing(short bgId) {
243 IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
244 if (cached != null) {
245 return cached;
246 }
247 throw new IllegalArgumentException(
248 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
249 );
250 }
251
252 private static void handleLoopException(Throwable throwable) {
253 logger.warn("Unexpected exception in the IO event loop.", throwable);
254
255
256
257 try {
258 Thread.sleep(100);
259 } catch (InterruptedException ignore) {
260
261 }
262 }
263
264 private void handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
265 try {
266 int id = UserData.decodeId(udata);
267 byte op = UserData.decodeOp(udata);
268 short data = UserData.decodeData(udata);
269
270 if (logger.isTraceEnabled()) {
271 logger.trace("completed(ring {}): {}(id={}, res={})",
272 ringBuffer.fd(), Native.opToStr(op), data, res);
273 }
274 if (id == EVENTFD_ID) {
275 handleEventFdRead();
276 return;
277 }
278 if (id == RINGFD_ID) {
279
280 return;
281 }
282 DefaultIoUringIoRegistration registration = registrations.get(id);
283 if (registration == null) {
284 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
285 Native.opToStr(op), id, res);
286 return;
287 }
288 registration.handle(res, flags, op, data, extraCqeData);
289 } catch (Error e) {
290 throw e;
291 } catch (Throwable throwable) {
292 handleLoopException(throwable);
293 }
294 }
295
296 private void handleEventFdRead() {
297 eventfdReadSubmitted = 0;
298 if (!eventFdClosing) {
299 eventfdAsyncNotify.set(false);
300 submitEventFdRead();
301 }
302 }
303
304 private void submitEventFdRead() {
305 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
306 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
307
308 eventfdReadSubmitted = submissionQueue.addEventFdRead(
309 eventfd.intValue(), eventfdReadBufAddress, 0, 8, udata);
310 }
311
312 private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
313 boolean linkTimeout, long timeoutNanoSeconds) {
314 if (timeoutNanoSeconds != -1) {
315 long udata = UserData.encode(RINGFD_ID,
316 linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
317
318
319 long seconds, nanoSeconds;
320 if (timeoutNanoSeconds == 0) {
321 seconds = 0;
322 nanoSeconds = 0;
323 } else {
324 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
325 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
326 }
327
328 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
329 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
330 if (linkTimeout) {
331 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
332 } else {
333 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
334 }
335 }
336 int submitted = submissionQueue.submitAndGet();
337
338
339 iovArray.clear();
340 msgHdrMemoryArray.clear();
341 return submitted;
342 }
343
344 @Override
345 public void prepareToDestroy() {
346 shuttingDown = true;
347 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
348 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
349
350 List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
351
352 for (DefaultIoUringIoRegistration registration: copy) {
353 registration.close();
354 }
355
356
357 Native.eventFdWrite(eventfd.intValue(), 1L);
358
359
360 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
361 submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
362
363
364 submissionQueue.submitAndGet();
365
366 while (completionQueue.hasCompletions()) {
367 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
368 if (submissionQueue.count() > 0) {
369 submissionQueue.submitAndGetNow();
370 }
371 }
372 }
373
374 @Override
375 public void destroy() {
376 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
377 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
378 drainEventFd();
379 if (submissionQueue.remaining() < 2) {
380
381
382 submissionQueue.submit();
383 }
384
385 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
386
387
388
389
390
391 submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
392
393 submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
394 completionQueue.process(this::handle);
395 for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
396 ioUringBufferRing.close();
397 }
398 completeRingClose();
399 }
400
401
402
403
404
405 private void drainEventFd() {
406 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
407 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
408 assert !eventFdClosing;
409 eventFdClosing = true;
410 boolean eventPending = eventfdAsyncNotify.getAndSet(true);
411 if (eventPending) {
412
413
414 while (eventfdReadSubmitted == 0) {
415 submitEventFdRead();
416 submissionQueue.submit();
417 }
418
419 class DrainFdEventCallback implements CompletionCallback {
420 boolean eventFdDrained;
421
422 @Override
423 public void handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
424 if (UserData.decodeId(udata) == EVENTFD_ID) {
425 eventFdDrained = true;
426 }
427 IoUringIoHandler.this.handle(res, flags, udata, extraCqeData);
428 }
429 }
430 final DrainFdEventCallback handler = new DrainFdEventCallback();
431 completionQueue.process(handler);
432 while (!handler.eventFdDrained) {
433 submissionQueue.submitAndGet();
434 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, handler);
435 }
436 }
437
438
439
440 if (eventfdReadSubmitted != 0) {
441 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
442 submissionQueue.addCancel(eventfdReadSubmitted, udata);
443 eventfdReadSubmitted = 0;
444 submissionQueue.submit();
445 }
446 }
447
448 private void completeRingClose() {
449 if (closeCompleted) {
450
451 return;
452 }
453 closeCompleted = true;
454 ringBuffer.close();
455 try {
456 eventfd.close();
457 } catch (IOException e) {
458 logger.warn("Failed to close eventfd", e);
459 }
460 eventfdReadBufCleanable.clean();
461 timeoutMemoryCleanable.clean();
462 iovArray.release();
463 msgHdrMemoryArray.release();
464 }
465
466 @Override
467 public IoRegistration register(IoHandle handle) throws Exception {
468 IoUringIoHandle ioHandle = cast(handle);
469 if (shuttingDown) {
470 throw new IllegalStateException("IoUringIoHandler is shutting down");
471 }
472 DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
473 for (;;) {
474 int id = nextRegistrationId();
475 DefaultIoUringIoRegistration old = registrations.put(id, registration);
476 if (old != null) {
477 assert old.handle != registration.handle;
478 registrations.put(id, old);
479 } else {
480 registration.setId(id);
481 ioHandle.registered();
482 break;
483 }
484 }
485
486 return registration;
487 }
488
489 private int nextRegistrationId() {
490 int id;
491 do {
492 id = nextRegistrationId++;
493 } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
494 return id;
495 }
496
497 private final class DefaultIoUringIoRegistration implements IoRegistration {
498 private final AtomicBoolean canceled = new AtomicBoolean();
499 private final ThreadAwareExecutor executor;
500 private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
501 final IoUringIoHandle handle;
502
503 private boolean removeLater;
504 private int outstandingCompletions;
505 private int id;
506
507 DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
508 this.executor = executor;
509 this.handle = handle;
510 }
511
512 void setId(int id) {
513 this.id = id;
514 }
515
516 @Override
517 public long submit(IoOps ops) {
518 IoUringIoOps ioOps = (IoUringIoOps) ops;
519 if (!isValid()) {
520 return INVALID_ID;
521 }
522 if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
523
524
525 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
526 }
527 long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
528 if (executor.isExecutorThread(Thread.currentThread())) {
529 submit0(ioOps, udata);
530 } else {
531 executor.execute(() -> submit0(ioOps, udata));
532 }
533 return udata;
534 }
535
536 private void submit0(IoUringIoOps ioOps, long udata) {
537 ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
538 ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
539 ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
540 );
541 outstandingCompletions++;
542 }
543
544 @SuppressWarnings("unchecked")
545 @Override
546 public <T> T attachment() {
547 return (T) IoUringIoHandler.this;
548 }
549
550 @Override
551 public boolean isValid() {
552 return !canceled.get();
553 }
554
555 @Override
556 public boolean cancel() {
557 if (!canceled.compareAndSet(false, true)) {
558
559 return false;
560 }
561 if (executor.isExecutorThread(Thread.currentThread())) {
562 tryRemove();
563 } else {
564 executor.execute(this::tryRemove);
565 }
566 return true;
567 }
568
569 private void tryRemove() {
570 if (outstandingCompletions > 0) {
571
572
573 removeLater = true;
574 return;
575 }
576 remove();
577 }
578
579 private void remove() {
580 DefaultIoUringIoRegistration old = registrations.remove(id);
581 assert old == this;
582 handle.unregistered();
583 }
584
585 void close() {
586
587
588 assert executor.isExecutorThread(Thread.currentThread());
589 try {
590 handle.close();
591 } catch (Exception e) {
592 logger.debug("Exception during closing " + handle, e);
593 }
594 }
595
596 void handle(int res, int flags, byte op, short data, ByteBuffer extraCqeData) {
597 event.update(res, flags, op, data, extraCqeData);
598 handle.handle(this, event);
599
600
601 if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
602
603 removeLater = false;
604 remove();
605 }
606 }
607 }
608
609 private static IoUringIoHandle cast(IoHandle handle) {
610 if (handle instanceof IoUringIoHandle) {
611 return (IoUringIoHandle) handle;
612 }
613 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
614 }
615
616 @Override
617 public void wakeup() {
618 if (!executor.isExecutorThread(Thread.currentThread()) &&
619 !eventfdAsyncNotify.getAndSet(true)) {
620
621 Native.eventFdWrite(eventfd.intValue(), 1L);
622 }
623 }
624
625 @Override
626 public boolean isCompatible(Class<? extends IoHandle> handleType) {
627 return IoUringIoHandle.class.isAssignableFrom(handleType);
628 }
629
630 IovArray iovArray() {
631 if (iovArray.isFull()) {
632
633 submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
634 }
635 assert iovArray.count() == 0;
636 return iovArray;
637 }
638
639 MsgHdrMemoryArray msgHdrMemoryArray() {
640 if (msgHdrMemoryArray.isFull()) {
641
642 submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
643 }
644 return msgHdrMemoryArray;
645 }
646
647
648
649
650 byte[] inet4AddressArray() {
651 return inet4AddressArray;
652 }
653
654
655
656
657 byte[] inet6AddressArray() {
658 return inet6AddressArray;
659 }
660
661
662
663
664
665
666 public static IoHandlerFactory newFactory() {
667 return newFactory(new IoUringIoHandlerConfig());
668 }
669
670
671
672
673
674
675
676
677 public static IoHandlerFactory newFactory(int ringSize) {
678 IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
679 configuration.setRingSize(ringSize);
680 return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
681 }
682
683
684
685
686
687
688
689 public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
690 IoUring.ensureAvailability();
691 final IoUringIoHandlerConfig copy = ObjectUtil.checkNotNull(config, "config").verifyAndClone();
692 return new IoHandlerFactory() {
693 @Override
694 public IoHandler newHandler(ThreadAwareExecutor eventLoop) {
695 return new IoUringIoHandler(eventLoop, copy);
696 }
697
698 @Override
699 public boolean isChangingThreadSupported() {
700 return !copy.singleIssuer();
701 }
702 };
703 }
704 }