1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.channel.IoHandlerContext;
19 import io.netty.channel.IoHandle;
20 import io.netty.channel.IoHandler;
21 import io.netty.channel.IoHandlerFactory;
22 import io.netty.channel.IoOps;
23 import io.netty.channel.IoRegistration;
24 import io.netty.channel.unix.Buffer;
25 import io.netty.channel.unix.Errors;
26 import io.netty.channel.unix.FileDescriptor;
27 import io.netty.channel.unix.IovArray;
28 import io.netty.util.collection.IntObjectHashMap;
29 import io.netty.util.collection.IntObjectMap;
30 import io.netty.util.concurrent.ThreadAwareExecutor;
31 import io.netty.util.internal.CleanableDirectBuffer;
32 import io.netty.util.internal.ObjectUtil;
33 import io.netty.util.internal.StringUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.io.IOException;
38 import java.io.UncheckedIOException;
39 import java.nio.ByteBuffer;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.List;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45
46 import static java.lang.Math.max;
47 import static java.lang.Math.min;
48 import static java.util.Objects.requireNonNull;
49
50
51
52
53 public final class IoUringIoHandler implements IoHandler {
54 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
55
56 private final RingBuffer ringBuffer;
57 private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
58 private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
59
60 private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
61 private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
62
63 private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
64 private final FileDescriptor eventfd;
65 private final CleanableDirectBuffer eventfdReadBufCleanable;
66 private final ByteBuffer eventfdReadBuf;
67 private final long eventfdReadBufAddress;
68 private final CleanableDirectBuffer timeoutMemoryCleanable;
69 private final ByteBuffer timeoutMemory;
70 private final long timeoutMemoryAddress;
71 private final IovArray iovArray;
72 private final MsgHdrMemoryArray msgHdrMemoryArray;
73 private long eventfdReadSubmitted;
74 private boolean eventFdClosing;
75 private volatile boolean shuttingDown;
76 private boolean closeCompleted;
77 private int nextRegistrationId = Integer.MIN_VALUE;
78
79
80 private static final int EVENTFD_ID = Integer.MAX_VALUE;
81 private static final int RINGFD_ID = EVENTFD_ID - 1;
82 private static final int INVALID_ID = 0;
83
84 private static final int KERNEL_TIMESPEC_SIZE = 16;
85
86 private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
87 private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
88
89 private final ThreadAwareExecutor executor;
90
91 IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
92
93 IoUring.ensureAvailability();
94 this.executor = requireNonNull(executor, "executor");
95 requireNonNull(config, "config");
96 int setupFlags = Native.setupFlags();
97
98
99
100 int cqSize = 2 * config.getRingSize();
101 if (config.needSetupCqeSize()) {
102 if (!IoUring.isSetupCqeSizeSupported()) {
103 throw new UnsupportedOperationException("IORING_SETUP_CQSIZE is not supported");
104 }
105 setupFlags |= Native.IORING_SETUP_CQSIZE;
106 cqSize = config.checkCqSize(config.getCqSize());
107 }
108 this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
109 if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
110 int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
111 int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
112 int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
113 if (result < 0) {
114
115 ringBuffer.close();
116 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
117 }
118 }
119
120 registeredIoUringBufferRing = new IntObjectHashMap<>();
121 Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
122 if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
123 if (!IoUring.isRegisterBufferRingSupported()) {
124
125 ringBuffer.close();
126 throw new UnsupportedOperationException("IORING_REGISTER_PBUF_RING is not supported");
127 }
128 for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
129 try {
130 IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
131 registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
132 } catch (Errors.NativeIoException e) {
133 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
134 bufferRing.close();
135 }
136
137 ringBuffer.close();
138 throw new UncheckedIOException(e);
139 }
140 }
141 }
142
143 registrations = new IntObjectHashMap<>();
144 eventfd = Native.newBlockingEventFd();
145 eventfdReadBufCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
146 eventfdReadBuf = eventfdReadBufCleanable.buffer();
147 eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
148 timeoutMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(KERNEL_TIMESPEC_SIZE);
149 timeoutMemory = timeoutMemoryCleanable.buffer();
150 timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
151 iovArray = new IovArray(IoUring.NUM_ELEMENTS_IOVEC);
152 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1024);
153 }
154
155 @Override
156 public void initialize() {
157 ringBuffer.enable();
158
159 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
160 bufferRing.initialize();
161 }
162 }
163
164 @Override
165 public int run(IoHandlerContext context) {
166 if (closeCompleted) {
167 return 0;
168 }
169 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
170 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
171 if (!completionQueue.hasCompletions() && context.canBlock()) {
172 if (eventfdReadSubmitted == 0) {
173 submitEventFdRead();
174 }
175 long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
176 submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
177 } else {
178
179 submitAndClearNow(submissionQueue);
180 }
181 return processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
182 }
183
184 private int processCompletionsAndHandleOverflow(SubmissionQueue submissionQueue, CompletionQueue completionQueue,
185 CompletionCallback callback) {
186 int processed = 0;
187
188
189 for (int i = 0; i < 128; i++) {
190 int p = completionQueue.process(callback);
191 if ((submissionQueue.flags() & Native.IORING_SQ_CQ_OVERFLOW) != 0) {
192 logger.warn("CompletionQueue overflow detected, consider increasing size: {} ",
193 completionQueue.ringEntries);
194 submitAndClearNow(submissionQueue);
195 } else if (p == 0 &&
196
197
198 submitAndClearNow(submissionQueue) == 0 && !completionQueue.hasCompletions()) {
199 break;
200 }
201
202 processed += p;
203 }
204 return processed;
205 }
206
207 private int submitAndClearNow(SubmissionQueue submissionQueue) {
208 int submitted = submissionQueue.submitAndGetNow();
209
210
211
212 iovArray.clear();
213 msgHdrMemoryArray.clear();
214 return submitted;
215 }
216
217 private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
218 throws Errors.NativeIoException {
219 short bufferRingSize = bufferRingConfig.bufferRingSize();
220 short bufferGroupId = bufferRingConfig.bufferGroupId();
221 int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
222 long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
223 if (ioUringBufRingAddr < 0) {
224 throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
225 }
226 return new IoUringBufferRing(ringFd,
227 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
228 bufferRingSize, bufferRingConfig.batchSize(),
229 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator(),
230 bufferRingConfig.isBatchAllocation()
231 );
232 }
233
234 IoUringBufferRing findBufferRing(short bgId) {
235 IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
236 if (cached != null) {
237 return cached;
238 }
239 throw new IllegalArgumentException(
240 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
241 );
242 }
243
244 private static void handleLoopException(Throwable throwable) {
245 logger.warn("Unexpected exception in the IO event loop.", throwable);
246
247
248
249 try {
250 Thread.sleep(100);
251 } catch (InterruptedException ignore) {
252
253 }
254 }
255
256 private boolean handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
257 try {
258 int id = UserData.decodeId(udata);
259 byte op = UserData.decodeOp(udata);
260 short data = UserData.decodeData(udata);
261
262 if (logger.isTraceEnabled()) {
263 logger.trace("completed(ring {}): {}(id={}, res={})",
264 ringBuffer.fd(), Native.opToStr(op), data, res);
265 }
266 if (id == EVENTFD_ID) {
267 handleEventFdRead();
268 return true;
269 }
270 if (id == RINGFD_ID) {
271
272 return true;
273 }
274 DefaultIoUringIoRegistration registration = registrations.get(id);
275 if (registration == null) {
276 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
277 Native.opToStr(op), id, res);
278 return true;
279 }
280 registration.handle(res, flags, op, data, extraCqeData);
281 return true;
282 } catch (Error e) {
283 throw e;
284 } catch (Throwable throwable) {
285 handleLoopException(throwable);
286 return true;
287 }
288 }
289
290 private void handleEventFdRead() {
291 eventfdReadSubmitted = 0;
292 if (!eventFdClosing) {
293 eventfdAsyncNotify.set(false);
294 submitEventFdRead();
295 }
296 }
297
298 private void submitEventFdRead() {
299 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
300 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
301
302 eventfdReadSubmitted = submissionQueue.addEventFdRead(
303 eventfd.intValue(), eventfdReadBufAddress, 0, 8, udata);
304 }
305
306 private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
307 boolean linkTimeout, long timeoutNanoSeconds) {
308 if (timeoutNanoSeconds != -1) {
309 long udata = UserData.encode(RINGFD_ID,
310 linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
311
312
313 long seconds, nanoSeconds;
314 if (timeoutNanoSeconds == 0) {
315 seconds = 0;
316 nanoSeconds = 0;
317 } else {
318 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
319 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
320 }
321
322 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
323 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
324 if (linkTimeout) {
325 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
326 } else {
327 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
328 }
329 }
330 int submitted = submissionQueue.submitAndGet();
331
332
333 iovArray.clear();
334 msgHdrMemoryArray.clear();
335 return submitted;
336 }
337
338 @Override
339 public void prepareToDestroy() {
340 shuttingDown = true;
341 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
342 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
343
344 List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
345
346 for (DefaultIoUringIoRegistration registration: copy) {
347 registration.close();
348 }
349
350
351 Native.eventFdWrite(eventfd.intValue(), 1L);
352
353
354 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
355 submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
356
357
358 submissionQueue.submitAndGet();
359
360 while (completionQueue.hasCompletions()) {
361 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
362 if (submissionQueue.count() > 0) {
363 submissionQueue.submitAndGetNow();
364 }
365 }
366 }
367
368 @Override
369 public void destroy() {
370 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
371 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
372 drainEventFd();
373 if (submissionQueue.remaining() < 2) {
374
375
376 submissionQueue.submit();
377 }
378
379 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
380
381
382
383
384
385 submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
386
387 submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
388 completionQueue.process(this::handle);
389 for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
390 ioUringBufferRing.close();
391 }
392 completeRingClose();
393 }
394
395
396
397
398
399 private void drainEventFd() {
400 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
401 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
402 assert !eventFdClosing;
403 eventFdClosing = true;
404 boolean eventPending = eventfdAsyncNotify.getAndSet(true);
405 if (eventPending) {
406
407
408 while (eventfdReadSubmitted == 0) {
409 submitEventFdRead();
410 submissionQueue.submit();
411 }
412
413 class DrainFdEventCallback implements CompletionCallback {
414 boolean eventFdDrained;
415
416 @Override
417 public boolean handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
418 if (UserData.decodeId(udata) == EVENTFD_ID) {
419 eventFdDrained = true;
420 }
421 return IoUringIoHandler.this.handle(res, flags, udata, extraCqeData);
422 }
423 }
424 final DrainFdEventCallback handler = new DrainFdEventCallback();
425 completionQueue.process(handler);
426 while (!handler.eventFdDrained) {
427 submissionQueue.submitAndGet();
428 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, handler);
429 }
430 }
431
432
433
434 if (eventfdReadSubmitted != 0) {
435 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
436 submissionQueue.addCancel(eventfdReadSubmitted, udata);
437 eventfdReadSubmitted = 0;
438 submissionQueue.submit();
439 }
440 }
441
442 private void completeRingClose() {
443 if (closeCompleted) {
444
445 return;
446 }
447 closeCompleted = true;
448 ringBuffer.close();
449 try {
450 eventfd.close();
451 } catch (IOException e) {
452 logger.warn("Failed to close eventfd", e);
453 }
454 eventfdReadBufCleanable.clean();
455 timeoutMemoryCleanable.clean();
456 iovArray.release();
457 msgHdrMemoryArray.release();
458 }
459
460 @Override
461 public IoRegistration register(IoHandle handle) throws Exception {
462 IoUringIoHandle ioHandle = cast(handle);
463 if (shuttingDown) {
464 throw new IllegalStateException("IoUringIoHandler is shutting down");
465 }
466 DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
467 for (;;) {
468 int id = nextRegistrationId();
469 DefaultIoUringIoRegistration old = registrations.put(id, registration);
470 if (old != null) {
471 assert old.handle != registration.handle;
472 registrations.put(id, old);
473 } else {
474 registration.setId(id);
475 break;
476 }
477 }
478
479 return registration;
480 }
481
482 private int nextRegistrationId() {
483 int id;
484 do {
485 id = nextRegistrationId++;
486 } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
487 return id;
488 }
489
490 private final class DefaultIoUringIoRegistration implements IoRegistration {
491 private final AtomicBoolean canceled = new AtomicBoolean();
492 private final ThreadAwareExecutor executor;
493 private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
494 final IoUringIoHandle handle;
495
496 private boolean removeLater;
497 private int outstandingCompletions;
498 private int id;
499
500 DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
501 this.executor = executor;
502 this.handle = handle;
503 }
504
505 void setId(int id) {
506 this.id = id;
507 }
508
509 @Override
510 public long submit(IoOps ops) {
511 IoUringIoOps ioOps = (IoUringIoOps) ops;
512 if (!isValid()) {
513 return INVALID_ID;
514 }
515 if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
516
517
518 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
519 }
520 long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
521 if (executor.isExecutorThread(Thread.currentThread())) {
522 submit0(ioOps, udata);
523 } else {
524 executor.execute(() -> submit0(ioOps, udata));
525 }
526 return udata;
527 }
528
529 private void submit0(IoUringIoOps ioOps, long udata) {
530 ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
531 ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
532 ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
533 );
534 outstandingCompletions++;
535 }
536
537 @SuppressWarnings("unchecked")
538 @Override
539 public <T> T attachment() {
540 return (T) IoUringIoHandler.this;
541 }
542
543 @Override
544 public boolean isValid() {
545 return !canceled.get();
546 }
547
548 @Override
549 public boolean cancel() {
550 if (!canceled.compareAndSet(false, true)) {
551
552 return false;
553 }
554 if (executor.isExecutorThread(Thread.currentThread())) {
555 tryRemove();
556 } else {
557 executor.execute(this::tryRemove);
558 }
559 return true;
560 }
561
562 private void tryRemove() {
563 if (outstandingCompletions > 0) {
564
565
566 removeLater = true;
567 return;
568 }
569 remove();
570 }
571
572 private void remove() {
573 DefaultIoUringIoRegistration old = registrations.remove(id);
574 assert old == this;
575 }
576
577 void close() {
578
579
580 assert executor.isExecutorThread(Thread.currentThread());
581 try {
582 handle.close();
583 } catch (Exception e) {
584 logger.debug("Exception during closing " + handle, e);
585 }
586 }
587
588 void handle(int res, int flags, byte op, short data, ByteBuffer extraCqeData) {
589 event.update(res, flags, op, data, extraCqeData);
590 handle.handle(this, event);
591
592
593 if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
594
595 removeLater = false;
596 remove();
597 }
598 }
599 }
600
601 private static IoUringIoHandle cast(IoHandle handle) {
602 if (handle instanceof IoUringIoHandle) {
603 return (IoUringIoHandle) handle;
604 }
605 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
606 }
607
608 @Override
609 public void wakeup() {
610 if (!executor.isExecutorThread(Thread.currentThread()) &&
611 !eventfdAsyncNotify.getAndSet(true)) {
612
613 Native.eventFdWrite(eventfd.intValue(), 1L);
614 }
615 }
616
617 @Override
618 public boolean isCompatible(Class<? extends IoHandle> handleType) {
619 return IoUringIoHandle.class.isAssignableFrom(handleType);
620 }
621
622 IovArray iovArray() {
623 if (iovArray.isFull()) {
624
625 submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
626 }
627 assert iovArray.count() == 0;
628 return iovArray;
629 }
630
631 MsgHdrMemoryArray msgHdrMemoryArray() {
632 if (msgHdrMemoryArray.isFull()) {
633
634 submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
635 }
636 return msgHdrMemoryArray;
637 }
638
639
640
641
642 byte[] inet4AddressArray() {
643 return inet4AddressArray;
644 }
645
646
647
648
649 byte[] inet6AddressArray() {
650 return inet6AddressArray;
651 }
652
653
654
655
656
657
658 public static IoHandlerFactory newFactory() {
659 return newFactory(new IoUringIoHandlerConfig());
660 }
661
662
663
664
665
666
667
668
669 public static IoHandlerFactory newFactory(int ringSize) {
670 IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
671 configuration.setRingSize(ringSize);
672 return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
673 }
674
675
676
677
678
679
680
681 public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
682 IoUring.ensureAvailability();
683 ObjectUtil.checkNotNull(config, "config");
684 return eventLoop -> new IoUringIoHandler(eventLoop, config);
685 }
686 }