1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.channel.IoHandlerContext;
19 import io.netty.channel.IoHandle;
20 import io.netty.channel.IoHandler;
21 import io.netty.channel.IoHandlerFactory;
22 import io.netty.channel.IoOps;
23 import io.netty.channel.IoRegistration;
24 import io.netty.channel.unix.Buffer;
25 import io.netty.channel.unix.Errors;
26 import io.netty.channel.unix.FileDescriptor;
27 import io.netty.channel.unix.IovArray;
28 import io.netty.util.collection.IntObjectHashMap;
29 import io.netty.util.collection.IntObjectMap;
30 import io.netty.util.concurrent.ThreadAwareExecutor;
31 import io.netty.util.internal.CleanableDirectBuffer;
32 import io.netty.util.internal.ObjectUtil;
33 import io.netty.util.internal.StringUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.io.IOException;
38 import java.io.UncheckedIOException;
39 import java.nio.ByteBuffer;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.List;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45
46 import static java.lang.Math.max;
47 import static java.lang.Math.min;
48 import static java.util.Objects.requireNonNull;
49
50
51
52
53 public final class IoUringIoHandler implements IoHandler {
54 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
55
56 private final RingBuffer ringBuffer;
57 private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
58 private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
59
60 private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
61 private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
62
63 private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
64 private final FileDescriptor eventfd;
65 private final CleanableDirectBuffer eventfdReadBufCleanable;
66 private final ByteBuffer eventfdReadBuf;
67 private final long eventfdReadBufAddress;
68 private final CleanableDirectBuffer timeoutMemoryCleanable;
69 private final ByteBuffer timeoutMemory;
70 private final long timeoutMemoryAddress;
71 private final IovArray iovArray;
72 private long eventfdReadSubmitted;
73 private boolean eventFdClosing;
74 private volatile boolean shuttingDown;
75 private boolean closeCompleted;
76 private int nextRegistrationId = Integer.MIN_VALUE;
77
78
79 private static final int EVENTFD_ID = Integer.MAX_VALUE;
80 private static final int RINGFD_ID = EVENTFD_ID - 1;
81 private static final int INVALID_ID = 0;
82
83 private static final int KERNEL_TIMESPEC_SIZE = 16;
84
85 private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
86 private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
87
88 private final ThreadAwareExecutor executor;
89
90 IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
91
92 IoUring.ensureAvailability();
93 this.executor = requireNonNull(executor, "executor");
94 requireNonNull(config, "config");
95 int setupFlags = Native.setupFlags();
96
97
98
99 int cqSize = 2 * config.getRingSize();
100 if (config.needSetupCqeSize()) {
101 if (!IoUring.isSetupCqeSizeSupported()) {
102 throw new UnsupportedOperationException("IORING_SETUP_CQSIZE is not supported");
103 }
104 setupFlags |= Native.IORING_SETUP_CQSIZE;
105 cqSize = config.checkCqSize(config.getCqSize());
106 }
107 this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
108 if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
109 int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
110 int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
111 int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
112 if (result < 0) {
113
114 ringBuffer.close();
115 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
116 }
117 }
118
119 registeredIoUringBufferRing = new IntObjectHashMap<>();
120 Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
121 if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
122 if (!IoUring.isRegisterBufferRingSupported()) {
123
124 ringBuffer.close();
125 throw new UnsupportedOperationException("IORING_REGISTER_PBUF_RING is not supported");
126 }
127 for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
128 try {
129 IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
130 registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
131 } catch (Errors.NativeIoException e) {
132 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
133 bufferRing.close();
134 }
135
136 ringBuffer.close();
137 throw new UncheckedIOException(e);
138 }
139 }
140 }
141
142 registrations = new IntObjectHashMap<>();
143 eventfd = Native.newBlockingEventFd();
144 eventfdReadBufCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
145 eventfdReadBuf = eventfdReadBufCleanable.buffer();
146 eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
147 timeoutMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(KERNEL_TIMESPEC_SIZE);
148 timeoutMemory = timeoutMemoryCleanable.buffer();
149 timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
150 iovArray = new IovArray(IoUring.NUM_ELEMENTS_IOVEC);
151 }
152
153 @Override
154 public void initialize() {
155 ringBuffer.enable();
156
157 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
158 bufferRing.initialize();
159 }
160 }
161
162 @Override
163 public int run(IoHandlerContext context) {
164 if (closeCompleted) {
165 return 0;
166 }
167 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
168 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
169 if (!completionQueue.hasCompletions() && context.canBlock()) {
170 if (eventfdReadSubmitted == 0) {
171 submitEventFdRead();
172 }
173 long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
174 submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
175 } else {
176
177 submitAndClearNow(submissionQueue);
178 }
179 return completionQueue.process(this::handle);
180 }
181
182 private int submitAndClearNow(SubmissionQueue submissionQueue) {
183 int submitted = submissionQueue.submitAndGetNow();
184
185
186
187 iovArray.clear();
188 return submitted;
189 }
190
191 private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
192 throws Errors.NativeIoException {
193 short bufferRingSize = bufferRingConfig.bufferRingSize();
194 short bufferGroupId = bufferRingConfig.bufferGroupId();
195 int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
196 long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
197 if (ioUringBufRingAddr < 0) {
198 throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
199 }
200 return new IoUringBufferRing(ringFd,
201 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
202 bufferRingSize, bufferRingConfig.batchSize(),
203 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator(),
204 bufferRingConfig.isBatchAllocation()
205 );
206 }
207
208 IoUringBufferRing findBufferRing(short bgId) {
209 IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
210 if (cached != null) {
211 return cached;
212 }
213 throw new IllegalArgumentException(
214 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
215 );
216 }
217
218 private static void handleLoopException(Throwable throwable) {
219 logger.warn("Unexpected exception in the IO event loop.", throwable);
220
221
222
223 try {
224 Thread.sleep(100);
225 } catch (InterruptedException ignore) {
226
227 }
228 }
229
230 private boolean handle(int res, int flags, long udata) {
231 try {
232 int id = UserData.decodeId(udata);
233 byte op = UserData.decodeOp(udata);
234 short data = UserData.decodeData(udata);
235
236 if (logger.isTraceEnabled()) {
237 logger.trace("completed(ring {}): {}(id={}, res={})",
238 ringBuffer.fd(), Native.opToStr(op), data, res);
239 }
240 if (id == EVENTFD_ID) {
241 handleEventFdRead();
242 return true;
243 }
244 if (id == RINGFD_ID) {
245
246 return true;
247 }
248 DefaultIoUringIoRegistration registration = registrations.get(id);
249 if (registration == null) {
250 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
251 Native.opToStr(op), id, res);
252 return true;
253 }
254 registration.handle(res, flags, op, data);
255 return true;
256 } catch (Error e) {
257 throw e;
258 } catch (Throwable throwable) {
259 handleLoopException(throwable);
260 return true;
261 }
262 }
263
264 private void handleEventFdRead() {
265 eventfdReadSubmitted = 0;
266 if (!eventFdClosing) {
267 eventfdAsyncNotify.set(false);
268 submitEventFdRead();
269 }
270 }
271
272 private void submitEventFdRead() {
273 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
274 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
275
276 eventfdReadSubmitted = submissionQueue.addEventFdRead(
277 eventfd.intValue(), eventfdReadBufAddress, 0, 8, udata);
278 }
279
280 private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
281 boolean linkTimeout, long timeoutNanoSeconds) {
282 if (timeoutNanoSeconds != -1) {
283 long udata = UserData.encode(RINGFD_ID,
284 linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
285
286
287 long seconds, nanoSeconds;
288 if (timeoutNanoSeconds == 0) {
289 seconds = 0;
290 nanoSeconds = 0;
291 } else {
292 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
293 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
294 }
295
296 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
297 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
298 if (linkTimeout) {
299 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
300 } else {
301 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
302 }
303 }
304 int submitted = submissionQueue.submitAndGet();
305
306
307 iovArray.clear();
308 return submitted;
309 }
310
311 @Override
312 public void prepareToDestroy() {
313 shuttingDown = true;
314 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
315 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
316
317 List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
318
319 for (DefaultIoUringIoRegistration registration: copy) {
320 registration.close();
321 }
322
323
324 Native.eventFdWrite(eventfd.intValue(), 1L);
325
326
327 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
328 submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
329
330
331 submissionQueue.submitAndGet();
332 while (completionQueue.hasCompletions()) {
333 completionQueue.process(this::handle);
334
335 if (submissionQueue.count() > 0) {
336 submissionQueue.submitAndGetNow();
337 }
338 }
339 }
340
341 @Override
342 public void destroy() {
343 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
344 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
345 drainEventFd();
346 if (submissionQueue.remaining() < 2) {
347
348
349 submissionQueue.submit();
350 }
351
352 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
353
354
355
356
357
358 submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
359
360 submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
361 completionQueue.process(this::handle);
362 for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
363 ioUringBufferRing.close();
364 }
365 completeRingClose();
366 }
367
368
369
370
371
372 private void drainEventFd() {
373 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
374 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
375 assert !eventFdClosing;
376 eventFdClosing = true;
377 boolean eventPending = eventfdAsyncNotify.getAndSet(true);
378 if (eventPending) {
379
380
381 while (eventfdReadSubmitted == 0) {
382 submitEventFdRead();
383 submissionQueue.submit();
384 }
385
386 class DrainFdEventCallback implements CompletionCallback {
387 boolean eventFdDrained;
388
389 @Override
390 public boolean handle(int res, int flags, long udata) {
391 if (UserData.decodeId(udata) == EVENTFD_ID) {
392 eventFdDrained = true;
393 }
394 return IoUringIoHandler.this.handle(res, flags, udata);
395 }
396 }
397 final DrainFdEventCallback handler = new DrainFdEventCallback();
398 completionQueue.process(handler);
399 while (!handler.eventFdDrained) {
400 submissionQueue.submitAndGet();
401 completionQueue.process(handler);
402 }
403 }
404
405
406
407 if (eventfdReadSubmitted != 0) {
408 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
409 submissionQueue.addCancel(eventfdReadSubmitted, udata);
410 eventfdReadSubmitted = 0;
411 submissionQueue.submit();
412 }
413 }
414
415 private void completeRingClose() {
416 if (closeCompleted) {
417
418 return;
419 }
420 closeCompleted = true;
421 ringBuffer.close();
422 try {
423 eventfd.close();
424 } catch (IOException e) {
425 logger.warn("Failed to close eventfd", e);
426 }
427 eventfdReadBufCleanable.clean();
428 timeoutMemoryCleanable.clean();
429 iovArray.release();
430 }
431
432 @Override
433 public IoRegistration register(IoHandle handle) throws Exception {
434 IoUringIoHandle ioHandle = cast(handle);
435 if (shuttingDown) {
436 throw new IllegalStateException("IoUringIoHandler is shutting down");
437 }
438 DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
439 for (;;) {
440 int id = nextRegistrationId();
441 DefaultIoUringIoRegistration old = registrations.put(id, registration);
442 if (old != null) {
443 assert old.handle != registration.handle;
444 registrations.put(id, old);
445 } else {
446 registration.setId(id);
447 break;
448 }
449 }
450
451 return registration;
452 }
453
454 private int nextRegistrationId() {
455 int id;
456 do {
457 id = nextRegistrationId++;
458 } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
459 return id;
460 }
461
462 private final class DefaultIoUringIoRegistration implements IoRegistration {
463 private final AtomicBoolean canceled = new AtomicBoolean();
464 private final ThreadAwareExecutor executor;
465 private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
466 final IoUringIoHandle handle;
467
468 private boolean removeLater;
469 private int outstandingCompletions;
470 private int id;
471
472 DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
473 this.executor = executor;
474 this.handle = handle;
475 }
476
477 void setId(int id) {
478 this.id = id;
479 }
480
481 @Override
482 public long submit(IoOps ops) {
483 IoUringIoOps ioOps = (IoUringIoOps) ops;
484 if (!isValid()) {
485 return INVALID_ID;
486 }
487 if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
488
489
490 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
491 }
492 long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
493 if (executor.isExecutorThread(Thread.currentThread())) {
494 submit0(ioOps, udata);
495 } else {
496 executor.execute(() -> submit0(ioOps, udata));
497 }
498 return udata;
499 }
500
501 private void submit0(IoUringIoOps ioOps, long udata) {
502 ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
503 ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
504 ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
505 );
506 outstandingCompletions++;
507 }
508
509 @SuppressWarnings("unchecked")
510 @Override
511 public <T> T attachment() {
512 return (T) IoUringIoHandler.this;
513 }
514
515 @Override
516 public boolean isValid() {
517 return !canceled.get();
518 }
519
520 @Override
521 public boolean cancel() {
522 if (!canceled.compareAndSet(false, true)) {
523
524 return false;
525 }
526 if (executor.isExecutorThread(Thread.currentThread())) {
527 tryRemove();
528 } else {
529 executor.execute(this::tryRemove);
530 }
531 return true;
532 }
533
534 private void tryRemove() {
535 if (outstandingCompletions > 0) {
536
537
538 removeLater = true;
539 return;
540 }
541 remove();
542 }
543
544 private void remove() {
545 DefaultIoUringIoRegistration old = registrations.remove(id);
546 assert old == this;
547 }
548
549 void close() {
550
551
552 assert executor.isExecutorThread(Thread.currentThread());
553 try {
554 handle.close();
555 } catch (Exception e) {
556 logger.debug("Exception during closing " + handle, e);
557 }
558 }
559
560 void handle(int res, int flags, byte op, short data) {
561 event.update(res, flags, op, data);
562 handle.handle(this, event);
563
564
565 if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
566
567 removeLater = false;
568 remove();
569 }
570 }
571 }
572
573 private static IoUringIoHandle cast(IoHandle handle) {
574 if (handle instanceof IoUringIoHandle) {
575 return (IoUringIoHandle) handle;
576 }
577 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
578 }
579
580 @Override
581 public void wakeup() {
582 if (!executor.isExecutorThread(Thread.currentThread()) &&
583 !eventfdAsyncNotify.getAndSet(true)) {
584
585 Native.eventFdWrite(eventfd.intValue(), 1L);
586 }
587 }
588
589 @Override
590 public boolean isCompatible(Class<? extends IoHandle> handleType) {
591 return IoUringIoHandle.class.isAssignableFrom(handleType);
592 }
593
594 IovArray iovArray() {
595 if (iovArray.isFull()) {
596
597 submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
598 }
599 assert iovArray.count() == 0;
600 return iovArray;
601 }
602
603
604
605
606 byte[] inet4AddressArray() {
607 return inet4AddressArray;
608 }
609
610
611
612
613 byte[] inet6AddressArray() {
614 return inet6AddressArray;
615 }
616
617
618
619
620
621
622 public static IoHandlerFactory newFactory() {
623 return newFactory(new IoUringIoHandlerConfig());
624 }
625
626
627
628
629
630
631
632
633 public static IoHandlerFactory newFactory(int ringSize) {
634 IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
635 configuration.setRingSize(ringSize);
636 return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
637 }
638
639
640
641
642
643
644
645 public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
646 IoUring.ensureAvailability();
647 ObjectUtil.checkNotNull(config, "config");
648 return eventLoop -> new IoUringIoHandler(eventLoop, config);
649 }
650 }