1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.Unpooled;
19 import io.netty.channel.IoHandlerContext;
20 import io.netty.channel.IoHandle;
21 import io.netty.channel.IoHandler;
22 import io.netty.channel.IoHandlerFactory;
23 import io.netty.channel.IoOps;
24 import io.netty.channel.IoRegistration;
25 import io.netty.channel.unix.Buffer;
26 import io.netty.channel.unix.Errors;
27 import io.netty.channel.unix.FileDescriptor;
28 import io.netty.channel.unix.IovArray;
29 import io.netty.util.collection.IntObjectHashMap;
30 import io.netty.util.collection.IntObjectMap;
31 import io.netty.util.concurrent.ThreadAwareExecutor;
32 import io.netty.util.internal.ObjectUtil;
33 import io.netty.util.internal.StringUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.io.IOException;
38 import java.io.UncheckedIOException;
39 import java.nio.ByteBuffer;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.List;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45
46 import static java.lang.Math.max;
47 import static java.lang.Math.min;
48 import static java.util.Objects.requireNonNull;
49
50
51
52
53 public final class IoUringIoHandler implements IoHandler {
54 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
55
56 private final RingBuffer ringBuffer;
57 private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
58 private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
59
60 private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
61 private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
62
63 private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
64 private final FileDescriptor eventfd;
65 private final ByteBuffer eventfdReadBuf;
66 private final long eventfdReadBufAddress;
67 private final ByteBuffer timeoutMemory;
68 private final long timeoutMemoryAddress;
69 private final IovArray iovArray;
70 private long eventfdReadSubmitted;
71 private boolean eventFdClosing;
72 private volatile boolean shuttingDown;
73 private boolean closeCompleted;
74 private int nextRegistrationId = Integer.MIN_VALUE;
75
76
77 private static final int EVENTFD_ID = Integer.MAX_VALUE;
78 private static final int RINGFD_ID = EVENTFD_ID - 1;
79 private static final int INVALID_ID = 0;
80
81 private static final int KERNEL_TIMESPEC_SIZE = 16;
82
83 private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
84 private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
85
86 private final CompletionBuffer completionBuffer;
87 private final ThreadAwareExecutor executor;
88
89 IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
90
91 IoUring.ensureAvailability();
92 this.executor = requireNonNull(executor, "executor");
93 requireNonNull(config, "config");
94 int setupFlags = Native.setupFlags();
95
96
97
98 int cqSize = 2 * config.getRingSize();
99 if (config.needSetupCqeSize()) {
100 if (!IoUring.isSetupCqeSizeSupported()) {
101 throw new UnsupportedOperationException("IORING_SETUP_CQSIZE is not supported");
102 }
103 setupFlags |= Native.IORING_SETUP_CQSIZE;
104 cqSize = config.checkCqSize(config.getCqSize());
105 }
106 this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
107 if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
108 int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
109 int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
110 int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
111 if (result < 0) {
112
113 ringBuffer.close();
114 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
115 }
116 }
117
118 registeredIoUringBufferRing = new IntObjectHashMap<>();
119 Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
120 if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
121 if (!IoUring.isRegisterBufferRingSupported()) {
122
123 ringBuffer.close();
124 throw new UnsupportedOperationException("IORING_REGISTER_PBUF_RING is not supported");
125 }
126 for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
127 try {
128 IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
129 registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
130 } catch (Errors.NativeIoException e) {
131 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
132 bufferRing.close();
133 }
134
135 ringBuffer.close();
136 throw new UncheckedIOException(e);
137 }
138 }
139 }
140
141 registrations = new IntObjectHashMap<>();
142 eventfd = Native.newBlockingEventFd();
143 eventfdReadBuf = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
144 eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
145 this.timeoutMemory = Buffer.allocateDirectWithNativeOrder(KERNEL_TIMESPEC_SIZE);
146 this.timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
147
148
149 completionBuffer = new CompletionBuffer(ringBuffer.ioUringCompletionQueue().ringCapacity * 2, 0);
150
151 iovArray = new IovArray(Unpooled.wrappedBuffer(
152 Buffer.allocateDirectWithNativeOrder(IoUring.NUM_ELEMENTS_IOVEC * IovArray.IOV_SIZE))
153 .setIndex(0, 0));
154 }
155
156 @Override
157 public void initialize() {
158 ringBuffer.enable();
159
160 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
161 bufferRing.initialize();
162 }
163 }
164
165 @Override
166 public int run(IoHandlerContext context) {
167 if (closeCompleted) {
168 return 0;
169 }
170 int processedPerRun = 0;
171 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
172 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
173 if (!completionQueue.hasCompletions() && context.canBlock()) {
174 if (eventfdReadSubmitted == 0) {
175 submitEventFdRead();
176 }
177 long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
178 submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
179 } else {
180 submitAndClear(submissionQueue);
181 }
182 for (;;) {
183
184
185 int processed = drainAndProcessAll(completionQueue, this::handle);
186 processedPerRun += processed;
187
188
189
190
191 if (submitAndClear(submissionQueue) == 0 && processed == 0) {
192 break;
193 }
194 }
195
196 return processedPerRun;
197 }
198
199 void submitAndRunNow(long udata) {
200 if (closeCompleted) {
201 return;
202 }
203 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
204 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
205 if (submitAndClear(submissionQueue) > 0) {
206 completionBuffer.drain(completionQueue);
207 completionBuffer.processOneNow(this::handle, udata);
208 }
209 }
210
211 private int submitAndClear(SubmissionQueue submissionQueue) {
212 int submitted = submissionQueue.submit();
213
214
215
216 iovArray.clear();
217 return submitted;
218 }
219
220 private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
221 throws Errors.NativeIoException {
222 short bufferRingSize = bufferRingConfig.bufferRingSize();
223 short bufferGroupId = bufferRingConfig.bufferGroupId();
224 int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
225 long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
226 if (ioUringBufRingAddr < 0) {
227 throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
228 }
229 return new IoUringBufferRing(ringFd,
230 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
231 bufferRingSize, bufferRingConfig.batchSize(), bufferRingConfig.maxUnreleasedBuffers(),
232 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator()
233 );
234 }
235
236 IoUringBufferRing findBufferRing(short bgId) {
237 IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
238 if (cached != null) {
239 return cached;
240 }
241 throw new IllegalArgumentException(
242 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
243 );
244 }
245
246 private int drainAndProcessAll(CompletionQueue completionQueue, CompletionCallback callback) {
247 int processed = 0;
248 for (;;) {
249 boolean drainedAll = completionBuffer.drain(completionQueue);
250 processed += completionBuffer.processNow(callback);
251 if (drainedAll) {
252 break;
253 }
254 }
255 return processed;
256 }
257
258 private static void handleLoopException(Throwable throwable) {
259 logger.warn("Unexpected exception in the IO event loop.", throwable);
260
261
262
263 try {
264 Thread.sleep(100);
265 } catch (InterruptedException ignore) {
266
267 }
268 }
269
270 private boolean handle(int res, int flags, long udata) {
271 try {
272 int id = UserData.decodeId(udata);
273 byte op = UserData.decodeOp(udata);
274 short data = UserData.decodeData(udata);
275
276 if (logger.isTraceEnabled()) {
277 logger.trace("completed(ring {}): {}(id={}, res={})",
278 ringBuffer.fd(), Native.opToStr(op), data, res);
279 }
280 if (id == EVENTFD_ID) {
281 handleEventFdRead();
282 return true;
283 }
284 if (id == RINGFD_ID) {
285
286 return true;
287 }
288 DefaultIoUringIoRegistration registration = registrations.get(id);
289 if (registration == null) {
290 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
291 Native.opToStr(op), id, res);
292 return true;
293 }
294 registration.handle(res, flags, op, data);
295 return true;
296 } catch (Error e) {
297 throw e;
298 } catch (Throwable throwable) {
299 handleLoopException(throwable);
300 return true;
301 }
302 }
303
304 private void handleEventFdRead() {
305 eventfdReadSubmitted = 0;
306 if (!eventFdClosing) {
307 eventfdAsyncNotify.set(false);
308 submitEventFdRead();
309 }
310 }
311
312 private void submitEventFdRead() {
313 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
314 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
315
316 eventfdReadSubmitted = submissionQueue.addEventFdRead(
317 eventfd.intValue(), eventfdReadBufAddress, 0, 8, udata);
318 }
319
320 private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
321 boolean linkTimeout, long timeoutNanoSeconds) {
322 if (timeoutNanoSeconds != -1) {
323 long udata = UserData.encode(RINGFD_ID,
324 linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
325
326
327 long seconds, nanoSeconds;
328 if (timeoutNanoSeconds == 0) {
329 seconds = 0;
330 nanoSeconds = 0;
331 } else {
332 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
333 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
334 }
335
336 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
337 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
338 if (linkTimeout) {
339 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
340 } else {
341 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
342 }
343 }
344 int submitted = submissionQueue.submitAndWait();
345
346
347 iovArray.clear();
348 return submitted;
349 }
350
351 @Override
352 public void prepareToDestroy() {
353 shuttingDown = true;
354 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
355 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
356
357 List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
358
359 for (DefaultIoUringIoRegistration registration: copy) {
360 registration.close();
361 }
362
363
364 Native.eventFdWrite(eventfd.intValue(), 1L);
365
366
367 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
368 submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
369
370
371 submissionQueue.submitAndWait();
372 while (completionQueue.hasCompletions()) {
373 completionQueue.process(this::handle);
374
375 if (submissionQueue.count() > 0) {
376 submissionQueue.submit();
377 }
378 }
379 }
380
381 @Override
382 public void destroy() {
383 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
384 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
385 drainEventFd();
386 if (submissionQueue.remaining() < 2) {
387
388
389 submissionQueue.submit();
390 }
391
392 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
393
394
395
396
397
398 submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
399
400 submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
401 completionQueue.process(this::handle);
402 for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
403 ioUringBufferRing.close();
404 }
405 completeRingClose();
406 }
407
408
409
410
411
412 private void drainEventFd() {
413 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
414 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
415 assert !eventFdClosing;
416 eventFdClosing = true;
417 boolean eventPending = eventfdAsyncNotify.getAndSet(true);
418 if (eventPending) {
419
420
421 while (eventfdReadSubmitted == 0) {
422 submitEventFdRead();
423 submissionQueue.submit();
424 }
425
426 class DrainFdEventCallback implements CompletionCallback {
427 boolean eventFdDrained;
428
429 @Override
430 public boolean handle(int res, int flags, long udata) {
431 if (UserData.decodeId(udata) == EVENTFD_ID) {
432 eventFdDrained = true;
433 }
434 return IoUringIoHandler.this.handle(res, flags, udata);
435 }
436 }
437 final DrainFdEventCallback handler = new DrainFdEventCallback();
438 drainAndProcessAll(completionQueue, handler);
439 completionQueue.process(handler);
440 while (!handler.eventFdDrained) {
441 submissionQueue.submitAndWait();
442 drainAndProcessAll(completionQueue, handler);
443 }
444 }
445
446
447
448 if (eventfdReadSubmitted != 0) {
449 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
450 submissionQueue.addCancel(eventfdReadSubmitted, udata);
451 eventfdReadSubmitted = 0;
452 submissionQueue.submit();
453 }
454 }
455
456 private void completeRingClose() {
457 if (closeCompleted) {
458
459 return;
460 }
461 closeCompleted = true;
462 ringBuffer.close();
463 try {
464 eventfd.close();
465 } catch (IOException e) {
466 logger.warn("Failed to close eventfd", e);
467 }
468 Buffer.free(eventfdReadBuf);
469 Buffer.free(timeoutMemory);
470 iovArray.release();
471 }
472
473 @Override
474 public IoRegistration register(IoHandle handle) throws Exception {
475 IoUringIoHandle ioHandle = cast(handle);
476 if (shuttingDown) {
477 throw new IllegalStateException("IoUringIoHandler is shutting down");
478 }
479 DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
480 for (;;) {
481 int id = nextRegistrationId();
482 DefaultIoUringIoRegistration old = registrations.put(id, registration);
483 if (old != null) {
484 assert old.handle != registration.handle;
485 registrations.put(id, old);
486 } else {
487 registration.setId(id);
488 break;
489 }
490 }
491
492 return registration;
493 }
494
495 private int nextRegistrationId() {
496 int id;
497 do {
498 id = nextRegistrationId++;
499 } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
500 return id;
501 }
502
503 private final class DefaultIoUringIoRegistration implements IoRegistration {
504 private final AtomicBoolean canceled = new AtomicBoolean();
505 private final ThreadAwareExecutor executor;
506 private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
507 final IoUringIoHandle handle;
508
509 private boolean removeLater;
510 private int outstandingCompletions;
511 private int id;
512
513 DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
514 this.executor = executor;
515 this.handle = handle;
516 }
517
518 void setId(int id) {
519 this.id = id;
520 }
521
522 @Override
523 public long submit(IoOps ops) {
524 IoUringIoOps ioOps = (IoUringIoOps) ops;
525 if (!isValid()) {
526 return INVALID_ID;
527 }
528 if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
529
530
531 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
532 }
533 long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
534 if (executor.isExecutorThread(Thread.currentThread())) {
535 submit0(ioOps, udata);
536 } else {
537 executor.execute(() -> submit0(ioOps, udata));
538 }
539 return udata;
540 }
541
542 private void submit0(IoUringIoOps ioOps, long udata) {
543 ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
544 ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
545 ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
546 );
547 outstandingCompletions++;
548 }
549
550 @SuppressWarnings("unchecked")
551 @Override
552 public <T> T attachment() {
553 return (T) IoUringIoHandler.this;
554 }
555
556 @Override
557 public boolean isValid() {
558 return !canceled.get();
559 }
560
561 @Override
562 public boolean cancel() {
563 if (!canceled.compareAndSet(false, true)) {
564
565 return false;
566 }
567 if (executor.isExecutorThread(Thread.currentThread())) {
568 tryRemove();
569 } else {
570 executor.execute(this::tryRemove);
571 }
572 return true;
573 }
574
575 private void tryRemove() {
576 if (outstandingCompletions > 0) {
577
578
579 removeLater = true;
580 return;
581 }
582 remove();
583 }
584
585 private void remove() {
586 DefaultIoUringIoRegistration old = registrations.remove(id);
587 assert old == this;
588 }
589
590 void close() {
591
592
593 assert executor.isExecutorThread(Thread.currentThread());
594 try {
595 handle.close();
596 } catch (Exception e) {
597 logger.debug("Exception during closing " + handle, e);
598 }
599 }
600
601 void handle(int res, int flags, byte op, short data) {
602 event.update(res, flags, op, data);
603 handle.handle(this, event);
604
605
606 if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
607
608 removeLater = false;
609 remove();
610 }
611 }
612 }
613
614 private static IoUringIoHandle cast(IoHandle handle) {
615 if (handle instanceof IoUringIoHandle) {
616 return (IoUringIoHandle) handle;
617 }
618 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
619 }
620
621 @Override
622 public void wakeup() {
623 if (!executor.isExecutorThread(Thread.currentThread()) &&
624 !eventfdAsyncNotify.getAndSet(true)) {
625
626 Native.eventFdWrite(eventfd.intValue(), 1L);
627 }
628 }
629
630 @Override
631 public boolean isCompatible(Class<? extends IoHandle> handleType) {
632 return IoUringIoHandle.class.isAssignableFrom(handleType);
633 }
634
635 IovArray iovArray() {
636 if (iovArray.isFull()) {
637
638 submitAndClear(ringBuffer.ioUringSubmissionQueue());
639 }
640 assert iovArray.count() == 0;
641 return iovArray;
642 }
643
644
645
646
647 byte[] inet4AddressArray() {
648 return inet4AddressArray;
649 }
650
651
652
653
654 byte[] inet6AddressArray() {
655 return inet6AddressArray;
656 }
657
658
659
660
661
662
663 public static IoHandlerFactory newFactory() {
664 return newFactory(new IoUringIoHandlerConfig());
665 }
666
667
668
669
670
671
672
673
674 public static IoHandlerFactory newFactory(int ringSize) {
675 IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
676 configuration.setRingSize(ringSize);
677 return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
678 }
679
680
681
682
683
684
685
686 public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
687 IoUring.ensureAvailability();
688 ObjectUtil.checkNotNull(config, "config");
689 return eventLoop -> new IoUringIoHandler(eventLoop, config);
690 }
691 }