1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.channel.IoHandle;
19 import io.netty.channel.IoHandler;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandlerFactory;
22 import io.netty.channel.IoOps;
23 import io.netty.channel.IoRegistration;
24 import io.netty.channel.unix.Buffer;
25 import io.netty.channel.unix.Errors;
26 import io.netty.channel.unix.FileDescriptor;
27 import io.netty.channel.unix.IovArray;
28 import io.netty.util.collection.IntObjectHashMap;
29 import io.netty.util.collection.IntObjectMap;
30 import io.netty.util.concurrent.ThreadAwareExecutor;
31 import io.netty.util.internal.CleanableDirectBuffer;
32 import io.netty.util.internal.ObjectUtil;
33 import io.netty.util.internal.StringUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.io.IOException;
38 import java.io.UncheckedIOException;
39 import java.nio.ByteBuffer;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.List;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45
46 import static java.lang.Math.max;
47 import static java.lang.Math.min;
48 import static java.util.Objects.requireNonNull;
49
50
51
52
53 public final class IoUringIoHandler implements IoHandler {
54 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
55
56 private final RingBuffer ringBuffer;
57 private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
58 private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
59
60 private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
61 private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
62
63 private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
64 private final FileDescriptor eventfd;
65 private final CleanableDirectBuffer eventfdReadBufCleanable;
66 private final ByteBuffer eventfdReadBuf;
67 private final long eventfdReadBufAddress;
68 private final CleanableDirectBuffer timeoutMemoryCleanable;
69 private final ByteBuffer timeoutMemory;
70 private final long timeoutMemoryAddress;
71 private final IovArray iovArray;
72 private final MsgHdrMemoryArray msgHdrMemoryArray;
73 private long eventfdReadSubmitted;
74 private boolean eventFdClosing;
75 private volatile boolean shuttingDown;
76 private boolean closeCompleted;
77 private int nextRegistrationId = Integer.MIN_VALUE;
78
79
80 private static final int EVENTFD_ID = Integer.MAX_VALUE;
81 private static final int RINGFD_ID = EVENTFD_ID - 1;
82 private static final int INVALID_ID = 0;
83
84 private static final int KERNEL_TIMESPEC_SIZE = 16;
85
86 private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
87 private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
88
89 private final ThreadAwareExecutor executor;
90
91 IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
92
93 IoUring.ensureAvailability();
94 this.executor = requireNonNull(executor, "executor");
95 requireNonNull(config, "config");
96 int setupFlags = Native.setupFlags(config.singleIssuer());
97
98
99
100 int cqSize = 2 * config.getRingSize();
101 if (config.needSetupCqeSize()) {
102 assert IoUring.isSetupCqeSizeSupported();
103 setupFlags |= Native.IORING_SETUP_CQSIZE;
104 cqSize = config.getCqSize();
105 }
106 this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
107 if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
108 int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
109 int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
110 int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
111 if (result < 0) {
112
113 ringBuffer.close();
114 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
115 }
116 }
117
118 registeredIoUringBufferRing = new IntObjectHashMap<>();
119 Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
120 if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
121 for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
122 try {
123 IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
124 registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
125 } catch (Errors.NativeIoException e) {
126 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
127 bufferRing.close();
128 }
129
130 ringBuffer.close();
131 throw new UncheckedIOException(e);
132 }
133 }
134 }
135
136 registrations = new IntObjectHashMap<>();
137 eventfd = Native.newBlockingEventFd();
138 eventfdReadBufCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
139 eventfdReadBuf = eventfdReadBufCleanable.buffer();
140 eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
141 timeoutMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(KERNEL_TIMESPEC_SIZE);
142 timeoutMemory = timeoutMemoryCleanable.buffer();
143 timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
144 iovArray = new IovArray(IoUring.NUM_ELEMENTS_IOVEC);
145 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1024);
146 }
147
148 @Override
149 public void initialize() {
150 ringBuffer.enable();
151
152 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
153 bufferRing.initialize();
154 }
155 }
156
157 @Override
158 public int run(IoHandlerContext context) {
159 if (closeCompleted) {
160 if (context.shouldReportActiveIoTime()) {
161 context.reportActiveIoTime(0);
162 }
163 return 0;
164 }
165 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
166 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
167 if (!completionQueue.hasCompletions() && context.canBlock()) {
168 if (eventfdReadSubmitted == 0) {
169 submitEventFdRead();
170 }
171 long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
172 submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
173 } else {
174
175 submitAndClearNow(submissionQueue);
176 }
177
178 int processed;
179 if (context.shouldReportActiveIoTime()) {
180
181 long activeIoStartTimeNanos = System.nanoTime();
182 processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
183 long activeIoEndTimeNanos = System.nanoTime();
184 context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
185 } else {
186 processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
187 }
188 return processed;
189 }
190
191 private int processCompletionsAndHandleOverflow(SubmissionQueue submissionQueue, CompletionQueue completionQueue,
192 CompletionCallback callback) {
193 int processed = 0;
194
195
196 for (int i = 0; i < 128; i++) {
197 int p = completionQueue.process(callback);
198 if ((submissionQueue.flags() & Native.IORING_SQ_CQ_OVERFLOW) != 0) {
199 logger.warn("CompletionQueue overflow detected, consider increasing size: {} ",
200 completionQueue.ringEntries);
201 submitAndClearNow(submissionQueue);
202 } else if (p == 0 &&
203
204 (submissionQueue.count() == 0 ||
205
206
207 (submitAndClearNow(submissionQueue) == 0 && !completionQueue.hasCompletions()))) {
208 break;
209 }
210 processed += p;
211 }
212 return processed;
213 }
214
215 private int submitAndClearNow(SubmissionQueue submissionQueue) {
216 int submitted = submissionQueue.submitAndGetNow();
217
218
219
220 iovArray.clear();
221 msgHdrMemoryArray.clear();
222 return submitted;
223 }
224
225 private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
226 throws Errors.NativeIoException {
227 short bufferRingSize = bufferRingConfig.bufferRingSize();
228 short bufferGroupId = bufferRingConfig.bufferGroupId();
229 int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
230 long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
231 if (ioUringBufRingAddr < 0) {
232 throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
233 }
234 return new IoUringBufferRing(ringFd,
235 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
236 bufferRingSize, bufferRingConfig.batchSize(),
237 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator(),
238 bufferRingConfig.isBatchAllocation()
239 );
240 }
241
242 IoUringBufferRing findBufferRing(short bgId) {
243 IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
244 if (cached != null) {
245 return cached;
246 }
247 throw new IllegalArgumentException(
248 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
249 );
250 }
251
252 private static void handleLoopException(Throwable throwable) {
253 logger.warn("Unexpected exception in the IO event loop.", throwable);
254
255
256
257 try {
258 Thread.sleep(100);
259 } catch (InterruptedException ignore) {
260
261 }
262 }
263
264 private boolean handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
265 try {
266 int id = UserData.decodeId(udata);
267 byte op = UserData.decodeOp(udata);
268 short data = UserData.decodeData(udata);
269
270 if (logger.isTraceEnabled()) {
271 logger.trace("completed(ring {}): {}(id={}, res={})",
272 ringBuffer.fd(), Native.opToStr(op), data, res);
273 }
274 if (id == EVENTFD_ID) {
275 handleEventFdRead();
276 return true;
277 }
278 if (id == RINGFD_ID) {
279
280 return true;
281 }
282 DefaultIoUringIoRegistration registration = registrations.get(id);
283 if (registration == null) {
284 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
285 Native.opToStr(op), id, res);
286 return true;
287 }
288 registration.handle(res, flags, op, data, extraCqeData);
289 return true;
290 } catch (Error e) {
291 throw e;
292 } catch (Throwable throwable) {
293 handleLoopException(throwable);
294 return true;
295 }
296 }
297
298 private void handleEventFdRead() {
299 eventfdReadSubmitted = 0;
300 if (!eventFdClosing) {
301 eventfdAsyncNotify.set(false);
302 submitEventFdRead();
303 }
304 }
305
306 private void submitEventFdRead() {
307 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
308 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
309
310 eventfdReadSubmitted = submissionQueue.addEventFdRead(
311 eventfd.intValue(), eventfdReadBufAddress, 0, 8, udata);
312 }
313
314 private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
315 boolean linkTimeout, long timeoutNanoSeconds) {
316 if (timeoutNanoSeconds != -1) {
317 long udata = UserData.encode(RINGFD_ID,
318 linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
319
320
321 long seconds, nanoSeconds;
322 if (timeoutNanoSeconds == 0) {
323 seconds = 0;
324 nanoSeconds = 0;
325 } else {
326 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
327 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
328 }
329
330 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
331 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
332 if (linkTimeout) {
333 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
334 } else {
335 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
336 }
337 }
338 int submitted = submissionQueue.submitAndGet();
339
340
341 iovArray.clear();
342 msgHdrMemoryArray.clear();
343 return submitted;
344 }
345
346 @Override
347 public void prepareToDestroy() {
348 shuttingDown = true;
349 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
350 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
351
352 List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
353
354 for (DefaultIoUringIoRegistration registration: copy) {
355 registration.close();
356 }
357
358
359 Native.eventFdWrite(eventfd.intValue(), 1L);
360
361
362 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
363 submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
364
365
366 submissionQueue.submitAndGet();
367
368 while (completionQueue.hasCompletions()) {
369 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
370 if (submissionQueue.count() > 0) {
371 submissionQueue.submitAndGetNow();
372 }
373 }
374 }
375
376 @Override
377 public void destroy() {
378 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
379 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
380 drainEventFd();
381 if (submissionQueue.remaining() < 2) {
382
383
384 submissionQueue.submit();
385 }
386
387 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
388
389
390
391
392
393 submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
394
395 submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
396 completionQueue.process(this::handle);
397 for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
398 ioUringBufferRing.close();
399 }
400 completeRingClose();
401 }
402
403
404
405
406
407 private void drainEventFd() {
408 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
409 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
410 assert !eventFdClosing;
411 eventFdClosing = true;
412 boolean eventPending = eventfdAsyncNotify.getAndSet(true);
413 if (eventPending) {
414
415
416 while (eventfdReadSubmitted == 0) {
417 submitEventFdRead();
418 submissionQueue.submit();
419 }
420
421 class DrainFdEventCallback implements CompletionCallback {
422 boolean eventFdDrained;
423
424 @Override
425 public boolean handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
426 if (UserData.decodeId(udata) == EVENTFD_ID) {
427 eventFdDrained = true;
428 }
429 return IoUringIoHandler.this.handle(res, flags, udata, extraCqeData);
430 }
431 }
432 final DrainFdEventCallback handler = new DrainFdEventCallback();
433 completionQueue.process(handler);
434 while (!handler.eventFdDrained) {
435 submissionQueue.submitAndGet();
436 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, handler);
437 }
438 }
439
440
441
442 if (eventfdReadSubmitted != 0) {
443 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
444 submissionQueue.addCancel(eventfdReadSubmitted, udata);
445 eventfdReadSubmitted = 0;
446 submissionQueue.submit();
447 }
448 }
449
450 private void completeRingClose() {
451 if (closeCompleted) {
452
453 return;
454 }
455 closeCompleted = true;
456 ringBuffer.close();
457 try {
458 eventfd.close();
459 } catch (IOException e) {
460 logger.warn("Failed to close eventfd", e);
461 }
462 eventfdReadBufCleanable.clean();
463 timeoutMemoryCleanable.clean();
464 iovArray.release();
465 msgHdrMemoryArray.release();
466 }
467
468 @Override
469 public IoRegistration register(IoHandle handle) throws Exception {
470 IoUringIoHandle ioHandle = cast(handle);
471 if (shuttingDown) {
472 throw new IllegalStateException("IoUringIoHandler is shutting down");
473 }
474 DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
475 for (;;) {
476 int id = nextRegistrationId();
477 DefaultIoUringIoRegistration old = registrations.put(id, registration);
478 if (old != null) {
479 assert old.handle != registration.handle;
480 registrations.put(id, old);
481 } else {
482 registration.setId(id);
483 ioHandle.registered();
484 break;
485 }
486 }
487
488 return registration;
489 }
490
491 private int nextRegistrationId() {
492 int id;
493 do {
494 id = nextRegistrationId++;
495 } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
496 return id;
497 }
498
499 private final class DefaultIoUringIoRegistration implements IoRegistration {
500 private final AtomicBoolean canceled = new AtomicBoolean();
501 private final ThreadAwareExecutor executor;
502 private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
503 final IoUringIoHandle handle;
504
505 private boolean removeLater;
506 private int outstandingCompletions;
507 private int id;
508
509 DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
510 this.executor = executor;
511 this.handle = handle;
512 }
513
514 void setId(int id) {
515 this.id = id;
516 }
517
518 @Override
519 public long submit(IoOps ops) {
520 IoUringIoOps ioOps = (IoUringIoOps) ops;
521 if (!isValid()) {
522 return INVALID_ID;
523 }
524 if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
525
526
527 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
528 }
529 long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
530 if (executor.isExecutorThread(Thread.currentThread())) {
531 submit0(ioOps, udata);
532 } else {
533 executor.execute(() -> submit0(ioOps, udata));
534 }
535 return udata;
536 }
537
538 private void submit0(IoUringIoOps ioOps, long udata) {
539 ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
540 ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
541 ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
542 );
543 outstandingCompletions++;
544 }
545
546 @SuppressWarnings("unchecked")
547 @Override
548 public <T> T attachment() {
549 return (T) IoUringIoHandler.this;
550 }
551
552 @Override
553 public boolean isValid() {
554 return !canceled.get();
555 }
556
557 @Override
558 public boolean cancel() {
559 if (!canceled.compareAndSet(false, true)) {
560
561 return false;
562 }
563 if (executor.isExecutorThread(Thread.currentThread())) {
564 tryRemove();
565 } else {
566 executor.execute(this::tryRemove);
567 }
568 return true;
569 }
570
571 private void tryRemove() {
572 if (outstandingCompletions > 0) {
573
574
575 removeLater = true;
576 return;
577 }
578 remove();
579 }
580
581 private void remove() {
582 DefaultIoUringIoRegistration old = registrations.remove(id);
583 assert old == this;
584 handle.unregistered();
585 }
586
587 void close() {
588
589
590 assert executor.isExecutorThread(Thread.currentThread());
591 try {
592 handle.close();
593 } catch (Exception e) {
594 logger.debug("Exception during closing " + handle, e);
595 }
596 }
597
598 void handle(int res, int flags, byte op, short data, ByteBuffer extraCqeData) {
599 event.update(res, flags, op, data, extraCqeData);
600 handle.handle(this, event);
601
602
603 if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
604
605 removeLater = false;
606 remove();
607 }
608 }
609 }
610
611 private static IoUringIoHandle cast(IoHandle handle) {
612 if (handle instanceof IoUringIoHandle) {
613 return (IoUringIoHandle) handle;
614 }
615 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
616 }
617
618 @Override
619 public void wakeup() {
620 if (!executor.isExecutorThread(Thread.currentThread()) &&
621 !eventfdAsyncNotify.getAndSet(true)) {
622
623 Native.eventFdWrite(eventfd.intValue(), 1L);
624 }
625 }
626
627 @Override
628 public boolean isCompatible(Class<? extends IoHandle> handleType) {
629 return IoUringIoHandle.class.isAssignableFrom(handleType);
630 }
631
632 IovArray iovArray() {
633 if (iovArray.isFull()) {
634
635 submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
636 }
637 assert iovArray.count() == 0;
638 return iovArray;
639 }
640
641 MsgHdrMemoryArray msgHdrMemoryArray() {
642 if (msgHdrMemoryArray.isFull()) {
643
644 submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
645 }
646 return msgHdrMemoryArray;
647 }
648
649
650
651
652 byte[] inet4AddressArray() {
653 return inet4AddressArray;
654 }
655
656
657
658
659 byte[] inet6AddressArray() {
660 return inet6AddressArray;
661 }
662
663
664
665
666
667
668 public static IoHandlerFactory newFactory() {
669 return newFactory(new IoUringIoHandlerConfig());
670 }
671
672
673
674
675
676
677
678
679 public static IoHandlerFactory newFactory(int ringSize) {
680 IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
681 configuration.setRingSize(ringSize);
682 return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
683 }
684
685
686
687
688
689
690
691 public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
692 IoUring.ensureAvailability();
693 final IoUringIoHandlerConfig copy = ObjectUtil.checkNotNull(config, "config").verifyAndClone();
694 return new IoHandlerFactory() {
695 @Override
696 public IoHandler newHandler(ThreadAwareExecutor eventLoop) {
697 return new IoUringIoHandler(eventLoop, copy);
698 }
699
700 @Override
701 public boolean isChangingThreadSupported() {
702 return !copy.singleIssuer();
703 }
704 };
705 }
706 }