1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.channel.IoHandlerContext;
19 import io.netty.channel.IoHandle;
20 import io.netty.channel.IoHandler;
21 import io.netty.channel.IoHandlerFactory;
22 import io.netty.channel.IoOps;
23 import io.netty.channel.IoRegistration;
24 import io.netty.channel.unix.Errors;
25 import io.netty.channel.unix.FileDescriptor;
26 import io.netty.util.collection.IntObjectHashMap;
27 import io.netty.util.collection.IntObjectMap;
28 import io.netty.util.concurrent.ThreadAwareExecutor;
29 import io.netty.util.internal.ObjectUtil;
30 import io.netty.util.internal.PlatformDependent;
31 import io.netty.util.internal.StringUtil;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.io.IOException;
36 import java.io.UncheckedIOException;
37 import java.util.ArrayList;
38 import java.util.Collection;
39 import java.util.List;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42
43 import static java.lang.Math.max;
44 import static java.lang.Math.min;
45 import static java.util.Objects.requireNonNull;
46
47
48
49
50 public final class IoUringIoHandler implements IoHandler {
51 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
52 private static final short RING_CLOSE = 1;
53
54 private final RingBuffer ringBuffer;
55 private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
56 private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
57
58 private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
59 private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
60
61 private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
62 private final FileDescriptor eventfd;
63 private final long eventfdReadBuf;
64 private final long timeoutMemoryAddress;
65
66 private long eventfdReadSubmitted;
67 private boolean eventFdClosing;
68 private volatile boolean shuttingDown;
69 private boolean closeCompleted;
70 private int nextRegistrationId = Integer.MIN_VALUE;
71
72
73 private static final int EVENTFD_ID = Integer.MAX_VALUE;
74 private static final int RINGFD_ID = EVENTFD_ID - 1;
75 private static final int INVALID_ID = 0;
76
77 private static final int KERNEL_TIMESPEC_SIZE = 16;
78
79 private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
80 private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
81
82 private final CompletionBuffer completionBuffer;
83 private final ThreadAwareExecutor executor;
84
85 IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
86
87 IoUring.ensureAvailability();
88 this.executor = requireNonNull(executor, "executor");
89 requireNonNull(config, "config");
90 int setupFlags = Native.setupFlags();
91
92
93
94 int cqSize = 2 * config.getRingSize();
95 if (config.needSetupCqeSize()) {
96 if (!IoUring.isSetupCqeSizeSupported()) {
97 throw new UnsupportedOperationException("IORING_SETUP_CQSIZE is not supported");
98 }
99 setupFlags |= Native.IORING_SETUP_CQSIZE;
100 cqSize = config.checkCqSize(config.getCqSize());
101 }
102 this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
103 if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
104 int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
105 int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
106 int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
107 if (result < 0) {
108
109 ringBuffer.close();
110 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
111 }
112 }
113
114 registeredIoUringBufferRing = new IntObjectHashMap<>();
115 Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
116 if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
117 if (!IoUring.isRegisterBufferRingSupported()) {
118
119 ringBuffer.close();
120 throw new UnsupportedOperationException("IORING_REGISTER_PBUF_RING is not supported");
121 }
122 for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
123 try {
124 IoUringBufferRing ring = newBufferRing(bufferRingConfig);
125 registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
126 } catch (Errors.NativeIoException e) {
127 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
128 bufferRing.close();
129 }
130
131 ringBuffer.close();
132 throw new UncheckedIOException(e);
133 }
134 }
135 }
136
137 registrations = new IntObjectHashMap<>();
138 eventfd = Native.newBlockingEventFd();
139 eventfdReadBuf = PlatformDependent.allocateMemory(8);
140 this.timeoutMemoryAddress = PlatformDependent.allocateMemory(KERNEL_TIMESPEC_SIZE);
141
142
143
144 completionBuffer = new CompletionBuffer(ringBuffer.ioUringCompletionQueue().ringCapacity * 2, 0);
145 }
146
147 @Override
148 public void initialize() {
149 ringBuffer.enable();
150
151 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
152 bufferRing.fill();
153 }
154 }
155
156 @Override
157 public int run(IoHandlerContext context) {
158 int processedPerRun = 0;
159 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
160 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
161 if (!completionQueue.hasCompletions() && context.canBlock()) {
162 if (eventfdReadSubmitted == 0) {
163 submitEventFdRead();
164 }
165 long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
166 submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
167 } else {
168 submissionQueue.submit();
169 }
170 for (;;) {
171
172
173 int processed = drainAndProcessAll(completionQueue, this::handle);
174 processedPerRun += processed;
175
176
177
178
179 if (submissionQueue.submit() == 0 && processed == 0) {
180 break;
181 }
182 }
183
184 return processedPerRun;
185 }
186
187 void submitAndRunNow(long udata) {
188 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
189 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
190 if (submissionQueue.submit() > 0) {
191 completionBuffer.drain(completionQueue);
192 completionBuffer.processOneNow(this::handle, udata);
193 }
194 }
195
196 IoUringBufferRing newBufferRing(IoUringBufferRingConfig bufferRingConfig) throws Errors.NativeIoException {
197 int ringFd = ringBuffer.fd();
198 short bufferRingSize = bufferRingConfig.bufferRingSize();
199 short bufferGroupId = bufferRingConfig.bufferGroupId();
200 int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
201 long ioUringBufRingAddr = Native.ioUringRegisterBuffRing(ringFd, bufferRingSize, bufferGroupId, flags);
202 if (ioUringBufRingAddr < 0) {
203 throw Errors.newIOException("ioUringRegisterBuffRing", (int) ioUringBufRingAddr);
204 }
205 return new IoUringBufferRing(ringFd, ioUringBufRingAddr,
206 bufferRingSize, bufferRingConfig.maxUnreleasedBuffers(),
207 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator()
208 );
209 }
210
211 IoUringBufferRing findBufferRing(short bgId) {
212 IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
213 if (cached != null) {
214 return cached;
215 }
216 throw new IllegalArgumentException(
217 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
218 );
219 }
220
221 private int drainAndProcessAll(CompletionQueue completionQueue, CompletionCallback callback) {
222 int processed = 0;
223 for (;;) {
224 boolean drainedAll = completionBuffer.drain(completionQueue);
225 processed += completionBuffer.processNow(callback);
226 if (drainedAll) {
227 break;
228 }
229 }
230 return processed;
231 }
232
233 private static void handleLoopException(Throwable throwable) {
234 logger.warn("Unexpected exception in the IO event loop.", throwable);
235
236
237
238 try {
239 Thread.sleep(100);
240 } catch (InterruptedException ignore) {
241
242 }
243 }
244
245 private boolean handle(int res, int flags, long udata) {
246 try {
247 int id = UserData.decodeId(udata);
248 byte op = UserData.decodeOp(udata);
249 short data = UserData.decodeData(udata);
250
251 if (logger.isTraceEnabled()) {
252 logger.trace("completed(ring {}): {}(id={}, res={})",
253 ringBuffer.fd(), Native.opToStr(op), data, res);
254 }
255 if (id == EVENTFD_ID) {
256 handleEventFdRead();
257 return true;
258 }
259 if (id == RINGFD_ID) {
260 if (op == Native.IORING_OP_NOP && data == RING_CLOSE) {
261 completeRingClose();
262 }
263 return true;
264 }
265 DefaultIoUringIoRegistration registration = registrations.get(id);
266 if (registration == null) {
267 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
268 Native.opToStr(op), id, res);
269 return true;
270 }
271 registration.handle(res, flags, op, data);
272 return true;
273 } catch (Error e) {
274 throw e;
275 } catch (Throwable throwable) {
276 handleLoopException(throwable);
277 return true;
278 }
279 }
280
281 private void handleEventFdRead() {
282 eventfdReadSubmitted = 0;
283 if (!eventFdClosing) {
284 eventfdAsyncNotify.set(false);
285 submitEventFdRead();
286 }
287 }
288
289 private void submitEventFdRead() {
290 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
291 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
292
293 eventfdReadSubmitted = submissionQueue.addEventFdRead(
294 eventfd.intValue(), eventfdReadBuf, 0, 8, udata);
295 }
296
297 private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
298 boolean linkTimeout, long timeoutNanoSeconds) {
299 if (timeoutNanoSeconds != -1) {
300 long udata = UserData.encode(RINGFD_ID,
301 linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
302
303
304 long seconds, nanoSeconds;
305 if (timeoutNanoSeconds == 0) {
306 seconds = 0;
307 nanoSeconds = 0;
308 } else {
309 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
310 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
311 }
312
313 PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
314 PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
315
316 if (linkTimeout) {
317 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
318 } else {
319 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
320 }
321 }
322 return submissionQueue.submitAndWait();
323 }
324
325 @Override
326 public void prepareToDestroy() {
327 shuttingDown = true;
328 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
329 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
330
331 List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
332
333 for (DefaultIoUringIoRegistration registration: copy) {
334 registration.close();
335 }
336
337
338 Native.eventFdWrite(eventfd.intValue(), 1L);
339
340
341 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
342 submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
343
344
345 submissionQueue.submitAndWait();
346 while (completionQueue.hasCompletions()) {
347 completionQueue.process(this::handle);
348
349 if (submissionQueue.count() > 0) {
350 submissionQueue.submit();
351 }
352 }
353 }
354
355 @Override
356 public void destroy() {
357 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
358 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
359 drainEventFd();
360 if (submissionQueue.remaining() < 2) {
361
362
363 submissionQueue.submit();
364 }
365
366 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
367
368
369
370
371
372 submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
373
374 submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
375 completionQueue.process(this::handle);
376 for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
377 ioUringBufferRing.close();
378 }
379 completeRingClose();
380 }
381
382
383
384
385
386 private void drainEventFd() {
387 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
388 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
389 assert !eventFdClosing;
390 eventFdClosing = true;
391 boolean eventPending = eventfdAsyncNotify.getAndSet(true);
392 if (eventPending) {
393
394
395 while (eventfdReadSubmitted == 0) {
396 submitEventFdRead();
397 submissionQueue.submit();
398 }
399
400 class DrainFdEventCallback implements CompletionCallback {
401 boolean eventFdDrained;
402
403 @Override
404 public boolean handle(int res, int flags, long udata) {
405 if (UserData.decodeId(udata) == EVENTFD_ID) {
406 eventFdDrained = true;
407 }
408 return IoUringIoHandler.this.handle(res, flags, udata);
409 }
410 }
411 final DrainFdEventCallback handler = new DrainFdEventCallback();
412 drainAndProcessAll(completionQueue, handler);
413 completionQueue.process(handler);
414 while (!handler.eventFdDrained) {
415 submissionQueue.submitAndWait();
416 drainAndProcessAll(completionQueue, handler);
417 }
418 }
419
420
421
422 if (eventfdReadSubmitted != 0) {
423 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
424 submissionQueue.addCancel(eventfdReadSubmitted, udata);
425 eventfdReadSubmitted = 0;
426 submissionQueue.submit();
427 }
428 }
429
430 private void completeRingClose() {
431 if (closeCompleted) {
432
433 return;
434 }
435 closeCompleted = true;
436 ringBuffer.close();
437 try {
438 eventfd.close();
439 } catch (IOException e) {
440 logger.warn("Failed to close eventfd", e);
441 }
442 PlatformDependent.freeMemory(eventfdReadBuf);
443 PlatformDependent.freeMemory(timeoutMemoryAddress);
444 }
445
446 @Override
447 public IoRegistration register(IoHandle handle) throws Exception {
448 IoUringIoHandle ioHandle = cast(handle);
449 if (shuttingDown) {
450 throw new IllegalStateException("IoUringIoHandler is shutting down");
451 }
452 DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
453 for (;;) {
454 int id = nextRegistrationId();
455 DefaultIoUringIoRegistration old = registrations.put(id, registration);
456 if (old != null) {
457 assert old.handle != registration.handle;
458 registrations.put(id, old);
459 } else {
460 registration.setId(id);
461 break;
462 }
463 }
464
465 return registration;
466 }
467
468 private int nextRegistrationId() {
469 int id;
470 do {
471 id = nextRegistrationId++;
472 } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
473 return id;
474 }
475
476 private final class DefaultIoUringIoRegistration implements IoRegistration {
477 private final AtomicBoolean canceled = new AtomicBoolean();
478 private final ThreadAwareExecutor executor;
479 private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
480 final IoUringIoHandle handle;
481
482 private boolean removeLater;
483 private int outstandingCompletions;
484 private int id;
485
486 DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
487 this.executor = executor;
488 this.handle = handle;
489 }
490
491 void setId(int id) {
492 this.id = id;
493 }
494
495 @Override
496 public long submit(IoOps ops) {
497 IoUringIoOps ioOps = (IoUringIoOps) ops;
498 if (!isValid()) {
499 return INVALID_ID;
500 }
501 long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
502 if (executor.isExecutorThread(Thread.currentThread())) {
503 submit0(ioOps, udata);
504 } else {
505 executor.execute(() -> submit0(ioOps, udata));
506 }
507 return udata;
508 }
509
510 private void submit0(IoUringIoOps ioOps, long udata) {
511 ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
512 ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
513 ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
514 );
515 outstandingCompletions++;
516 }
517
518 @SuppressWarnings("unchecked")
519 @Override
520 public <T> T attachment() {
521 return (T) IoUringIoHandler.this;
522 }
523
524 @Override
525 public boolean isValid() {
526 return !canceled.get();
527 }
528
529 @Override
530 public boolean cancel() {
531 if (!canceled.compareAndSet(false, true)) {
532
533 return false;
534 }
535 if (executor.isExecutorThread(Thread.currentThread())) {
536 tryRemove();
537 } else {
538 executor.execute(this::tryRemove);
539 }
540 return true;
541 }
542
543 private void tryRemove() {
544 if (outstandingCompletions > 0) {
545
546
547 removeLater = true;
548 return;
549 }
550 remove();
551 }
552
553 private void remove() {
554 DefaultIoUringIoRegistration old = registrations.remove(id);
555 assert old == this;
556 }
557
558 void close() {
559
560
561 assert executor.isExecutorThread(Thread.currentThread());
562 try {
563 handle.close();
564 } catch (Exception e) {
565 logger.debug("Exception during closing " + handle, e);
566 }
567 }
568
569 void handle(int res, int flags, byte op, short data) {
570 event.update(res, flags, op, data);
571 handle.handle(this, event);
572 if (--outstandingCompletions == 0 && removeLater) {
573
574 removeLater = false;
575 remove();
576 }
577 }
578 }
579
580 private static IoUringIoHandle cast(IoHandle handle) {
581 if (handle instanceof IoUringIoHandle) {
582 return (IoUringIoHandle) handle;
583 }
584 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
585 }
586
587 @Override
588 public void wakeup() {
589 if (!executor.isExecutorThread(Thread.currentThread()) &&
590 !eventfdAsyncNotify.getAndSet(true)) {
591
592 Native.eventFdWrite(eventfd.intValue(), 1L);
593 }
594 }
595
596 @Override
597 public boolean isCompatible(Class<? extends IoHandle> handleType) {
598 return IoUringIoHandle.class.isAssignableFrom(handleType);
599 }
600
601
602
603
604 byte[] inet4AddressArray() {
605 return inet4AddressArray;
606 }
607
608
609
610
611 byte[] inet6AddressArray() {
612 return inet6AddressArray;
613 }
614
615
616
617
618
619
620 public static IoHandlerFactory newFactory() {
621 return newFactory(new IoUringIoHandlerConfig());
622 }
623
624
625
626
627
628
629
630
631 public static IoHandlerFactory newFactory(int ringSize) {
632 IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
633 configuration.setRingSize(ringSize);
634 return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
635 }
636
637
638
639
640
641
642
643 public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
644 IoUring.ensureAvailability();
645 ObjectUtil.checkNotNull(config, "config");
646 return eventLoop -> new IoUringIoHandler(eventLoop, config);
647 }
648 }