1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.channel.IoHandle;
19 import io.netty.channel.IoHandler;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandlerFactory;
22 import io.netty.channel.IoOps;
23 import io.netty.channel.IoRegistration;
24 import io.netty.channel.unix.Buffer;
25 import io.netty.channel.unix.Errors;
26 import io.netty.channel.unix.FileDescriptor;
27 import io.netty.channel.unix.IovArray;
28 import io.netty.util.collection.IntObjectHashMap;
29 import io.netty.util.collection.IntObjectMap;
30 import io.netty.util.concurrent.ThreadAwareExecutor;
31 import io.netty.util.internal.CleanableDirectBuffer;
32 import io.netty.util.internal.ObjectUtil;
33 import io.netty.util.internal.StringUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.io.IOException;
38 import java.io.UncheckedIOException;
39 import java.nio.ByteBuffer;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.List;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicInteger;
46
47 import static java.lang.Math.max;
48 import static java.lang.Math.min;
49 import static java.util.Objects.requireNonNull;
50
51
52
53
54 public final class IoUringIoHandler implements IoHandler {
55 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
56 private static final int WAKEUP_CLOSED = 1 << 30;
57
58 private final RingBuffer ringBuffer;
59 private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
60 private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
61
62 private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
63 private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
64
65 private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
66 private final AtomicInteger wakeupWriters = new AtomicInteger();
67 private final FileDescriptor eventfd;
68 private final CleanableDirectBuffer eventfdReadBufCleanable;
69 private final ByteBuffer eventfdReadBuf;
70 private final long eventfdReadBufAddress;
71 private final CleanableDirectBuffer timeoutMemoryCleanable;
72 private final ByteBuffer timeoutMemory;
73 private final long timeoutMemoryAddress;
74 private final IovArray iovArray;
75 private final MsgHdrMemoryArray msgHdrMemoryArray;
76 private long eventfdReadSubmitted;
77 private boolean eventFdClosing;
78 private volatile boolean shuttingDown;
79 private boolean closeCompleted;
80 private final PendingOpMap pendingOps;
81 private int nextRegistrationId = 1;
82
83 private static final long INVALID_ID = 0;
84 private static final long EVENTFD_TOKEN = PendingOpMap.token(1);
85 private static final long RINGFD_TOKEN = PendingOpMap.token(2);
86 private static final int KERNEL_TIMESPEC_SIZE = 16;
87
88 private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
89 private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
90
91 private final ThreadAwareExecutor executor;
92
93 IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
94
95 IoUring.ensureAvailability();
96 this.executor = requireNonNull(executor, "executor");
97 requireNonNull(config, "config");
98 int setupFlags = Native.setupFlags(config.singleIssuer());
99
100
101
102 int cqSize = 2 * config.getRingSize();
103 if (config.needSetupCqeSize()) {
104 assert IoUring.isSetupCqeSizeSupported();
105 setupFlags |= Native.IORING_SETUP_CQSIZE;
106 cqSize = config.getCqSize();
107 }
108 this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
109 if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
110 int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
111 int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
112 int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
113 if (result < 0) {
114
115 ringBuffer.close();
116 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
117 }
118 }
119
120 registeredIoUringBufferRing = new IntObjectHashMap<>();
121 Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
122 if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
123 for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
124 try {
125 IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
126 registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
127 } catch (Errors.NativeIoException e) {
128 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
129 bufferRing.close();
130 }
131
132 ringBuffer.close();
133 throw new UncheckedIOException(e);
134 }
135 }
136 }
137
138 registrations = new IntObjectHashMap<>();
139 pendingOps = new PendingOpMap(IoUring.DEFAULT_PENDING_OPS_INITIAL_CAPACITY);
140 eventfd = Native.newBlockingEventFd();
141 eventfdReadBufCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
142 eventfdReadBuf = eventfdReadBufCleanable.buffer();
143 eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
144 timeoutMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(KERNEL_TIMESPEC_SIZE);
145 timeoutMemory = timeoutMemoryCleanable.buffer();
146 timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
147 iovArray = new IovArray(IoUring.NUM_ELEMENTS_IOVEC);
148 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1024);
149 }
150
151 @Override
152 public void initialize() {
153 ringBuffer.enable();
154
155 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
156 bufferRing.initialize();
157 }
158 }
159
160 @Override
161 public int run(IoHandlerContext context) {
162 if (closeCompleted) {
163 if (context.shouldReportActiveIoTime()) {
164 context.reportActiveIoTime(0);
165 }
166 return 0;
167 }
168 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
169 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
170 if (!completionQueue.hasCompletions() && context.canBlock()) {
171 if (eventfdReadSubmitted == 0) {
172 submitEventFdRead();
173 }
174 long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
175 submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
176 } else {
177
178 submitAndClearNow(submissionQueue);
179 }
180
181 int processed;
182 if (context.shouldReportActiveIoTime()) {
183
184 long activeIoStartTimeNanos = System.nanoTime();
185 processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
186 long activeIoEndTimeNanos = System.nanoTime();
187 context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
188 } else {
189 processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
190 }
191 return processed;
192 }
193
194 private boolean needSubmit(int sqFlags) {
195 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
196 return submissionQueue.count() > 0
197 || (sqFlags & (Native.IORING_SQ_CQ_OVERFLOW | Native.IORING_SQ_TASKRUN)) != 0;
198 }
199
200 private int processCompletionsAndHandleOverflow(SubmissionQueue submissionQueue, CompletionQueue completionQueue,
201 CompletionCallback callback) {
202 int processed = 0;
203
204
205 for (int i = 0; i < 128; i++) {
206 int p = completionQueue.process(callback);
207 int sqFlags = submissionQueue.flags();
208 if ((sqFlags & Native.IORING_SQ_CQ_OVERFLOW) != 0) {
209 logger.warn("CompletionQueue overflow detected, consider increasing size: {} ",
210 completionQueue.ringEntries);
211 }
212 if (p == 0) {
213 if (!needSubmit(sqFlags)) {
214 break;
215 }
216 submitAndClearNow0(submissionQueue);
217 }
218 processed += p;
219 }
220 return processed;
221 }
222
223 private int submitAndClearNow(SubmissionQueue submissionQueue) {
224 if (needSubmit(submissionQueue.flags())) {
225 return submitAndClearNow0(submissionQueue);
226 }
227 return 0;
228 }
229
230 private int submitAndClearNow0(SubmissionQueue submissionQueue) {
231
232 int submitted = submissionQueue.submitAndGetNow();
233
234
235
236 iovArray.clear();
237 msgHdrMemoryArray.clear();
238 return submitted;
239 }
240
241 private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
242 throws Errors.NativeIoException {
243 short bufferRingSize = bufferRingConfig.bufferRingSize();
244 short bufferGroupId = bufferRingConfig.bufferGroupId();
245 int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
246 long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
247 if (ioUringBufRingAddr < 0) {
248 throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
249 }
250 return new IoUringBufferRing(ringFd,
251 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
252 bufferRingSize, bufferRingConfig.batchSize(),
253 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator(),
254 bufferRingConfig.isBatchAllocation()
255 );
256 }
257
258 IoUringBufferRing findBufferRing(short bgId) {
259 IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
260 if (cached != null) {
261 return cached;
262 }
263 throw new IllegalArgumentException(
264 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
265 );
266 }
267
268 private static void handleLoopException(Throwable throwable) {
269 logger.warn("Unexpected exception in the IO event loop.", throwable);
270
271
272
273 try {
274 Thread.sleep(100);
275 } catch (InterruptedException ignore) {
276
277 }
278 }
279
280 private void handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
281 try {
282 if (udata == EVENTFD_TOKEN) {
283 handleEventFdRead();
284 return;
285 }
286 if (udata == RINGFD_TOKEN) {
287 return;
288 }
289 if (udata >= 0) {
290 handleFastPath(res, flags, udata, extraCqeData);
291 return;
292 }
293 handleSlowPath(res, flags, udata, extraCqeData);
294 } catch (Error e) {
295 throw e;
296 } catch (Throwable throwable) {
297 handleLoopException(throwable);
298 }
299 }
300
301 private void handleFastPath(int res, int flags, long udata, ByteBuffer extraCqeData) {
302 int id = UserData.decodeId(udata);
303 byte op = UserData.decodeOp(udata);
304 long userData = UserData.decodeData(udata);
305 DefaultIoUringIoRegistration registration = registrations.get(id);
306 if (registration != null) {
307 traceCompletion(registration, id, op, res);
308 registration.handle(res, flags, op, userData, extraCqeData);
309 return;
310 }
311 if (logger.isDebugEnabled()) {
312 logger.debug("ignoring packed completion for unknown registration (registrationId={}, op={}, userData={},"
313 + " res={})",
314 id, Native.opToStr(op), userData, res);
315 }
316 }
317
318 private void handleSlowPath(int res, int flags, long udata, ByteBuffer extraCqeData) {
319 long sequence = PendingOpMap.tokenSequence(udata);
320 int slot = pendingOps.findSlot(udata);
321 if (slot != -1) {
322 int registrationId = pendingOps.registrationId(slot);
323 DefaultIoUringIoRegistration registration = registrations.get(registrationId);
324 byte op = pendingOps.op(slot);
325 long userData = pendingOps.userData(slot);
326
327
328 if ((flags & Native.IORING_CQE_F_MORE) == 0) {
329 pendingOps.release(slot);
330 }
331
332
333 if (registration != null) {
334 traceCompletion(registration, registrationId, op, res);
335 registration.handle(res, flags, op, userData, extraCqeData);
336 return;
337 }
338 if (logger.isDebugEnabled()) {
339 logger.debug("ignoring slow-path completion for missing registration (registrationId={}, seq={}, "
340 + "op={}, userData={}, res={})",
341 registrationId, sequence, Native.opToStr(op), userData, res);
342 }
343 return;
344 }
345 if (logger.isDebugEnabled()) {
346 logger.debug("ignoring slow-path completion for unknown sequence (seq={}, res={})", sequence, res);
347 }
348 }
349
350 private void traceCompletion(DefaultIoUringIoRegistration registration, int registrationId, byte op, int res) {
351 if (!logger.isTraceEnabled()) {
352 return;
353 }
354 int fd = registration.fd();
355 if (fd != -1) {
356 logger.trace("completed(ring {}): {}(fd={}, res={})",
357 ringBuffer.fd(), Native.opToStr(op), fd, res);
358 } else {
359 logger.trace("completed(ring {}): {}(registrationId={}, res={})",
360 ringBuffer.fd(), Native.opToStr(op), registrationId, res);
361 }
362 }
363
364 private void handleEventFdRead() {
365 eventfdReadSubmitted = 0;
366 if (!eventFdClosing) {
367 eventfdAsyncNotify.set(false);
368 submitEventFdRead();
369 }
370 }
371
372 private void submitEventFdRead() {
373 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
374 eventfdReadSubmitted = submissionQueue.addEventFdRead(
375 eventfd.intValue(), eventfdReadBufAddress, 0, 8, EVENTFD_TOKEN);
376 }
377
378 private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
379 boolean linkTimeout, long timeoutNanoSeconds) {
380 if (timeoutNanoSeconds != -1) {
381
382
383 long seconds, nanoSeconds;
384 if (timeoutNanoSeconds == 0) {
385 seconds = 0;
386 nanoSeconds = 0;
387 } else {
388 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
389 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
390 }
391
392 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
393 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
394 if (linkTimeout) {
395 submissionQueue.addLinkTimeout(timeoutMemoryAddress, RINGFD_TOKEN);
396 } else {
397 submissionQueue.addTimeout(timeoutMemoryAddress, RINGFD_TOKEN);
398 }
399 }
400 int submitted = submissionQueue.submitAndGet();
401
402
403 iovArray.clear();
404 msgHdrMemoryArray.clear();
405 return submitted;
406 }
407
408 @Override
409 public void prepareToDestroy() {
410 shuttingDown = true;
411 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
412 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
413
414 List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
415
416 for (DefaultIoUringIoRegistration registration: copy) {
417 registration.close();
418 }
419
420
421 Native.eventFdWrite(eventfd.intValue(), 1L);
422
423
424 submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, RINGFD_TOKEN);
425
426
427 submissionQueue.submitAndGet();
428
429 while (completionQueue.hasCompletions()) {
430 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
431 if (submissionQueue.count() > 0) {
432 submissionQueue.submitAndGetNow();
433 }
434 }
435 }
436
437 @Override
438 public void destroy() {
439 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
440 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
441 drainEventFd();
442 if (submissionQueue.remaining() < 2) {
443
444
445 submissionQueue.submit();
446 }
447
448
449
450
451
452
453 submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), RINGFD_TOKEN);
454
455 submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
456 completionQueue.process(this::handle);
457 for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
458 ioUringBufferRing.close();
459 }
460 completeRingClose();
461 }
462
463
464
465
466
467 private void drainEventFd() {
468 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
469 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
470 assert !eventFdClosing;
471 eventFdClosing = true;
472 boolean eventPending = eventfdAsyncNotify.getAndSet(true);
473 if (eventPending) {
474
475
476 while (eventfdReadSubmitted == 0) {
477 submitEventFdRead();
478 submissionQueue.submit();
479 }
480
481 class DrainFdEventCallback implements CompletionCallback {
482 boolean eventFdDrained;
483
484 @Override
485 public void handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
486 if (udata == EVENTFD_TOKEN) {
487 eventFdDrained = true;
488 }
489 IoUringIoHandler.this.handle(res, flags, udata, extraCqeData);
490 }
491 }
492 final DrainFdEventCallback handler = new DrainFdEventCallback();
493 completionQueue.process(handler);
494 while (!handler.eventFdDrained) {
495 submissionQueue.submitAndGet();
496 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, handler);
497 }
498 }
499
500
501
502 if (eventfdReadSubmitted != 0) {
503 submissionQueue.addCancel(eventfdReadSubmitted, EVENTFD_TOKEN);
504 eventfdReadSubmitted = 0;
505 submissionQueue.submit();
506 }
507 }
508
509 private void completeRingClose() {
510 if (closeCompleted) {
511
512 return;
513 }
514 closeCompleted = true;
515 ringBuffer.close();
516 closeWakeupGate();
517 try {
518 eventfd.close();
519 } catch (IOException e) {
520 logger.warn("Failed to close eventfd", e);
521 }
522 eventfdReadBufCleanable.clean();
523 timeoutMemoryCleanable.clean();
524 iovArray.release();
525 msgHdrMemoryArray.release();
526 }
527
528 @Override
529 public IoRegistration register(IoHandle handle) throws Exception {
530 IoUringIoHandle ioHandle = cast(handle);
531 if (shuttingDown) {
532 throw new IllegalStateException("IoUringIoHandler is shutting down");
533 }
534 int startId = nextRegistrationId;
535 DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
536 for (;;) {
537 int id = nextRegistrationId();
538 DefaultIoUringIoRegistration old = registrations.put(id, registration);
539 if (old != null) {
540 assert old.handle != registration.handle;
541 registrations.put(id, old);
542 if (nextRegistrationId == startId) {
543 throw new IllegalStateException("registration id space exhausted");
544 }
545 } else {
546 registration.setId(id);
547 ioHandle.registered();
548 break;
549 }
550 }
551
552 return registration;
553 }
554
555 private int nextRegistrationId() {
556
557
558 int id = nextRegistrationId;
559 nextRegistrationId = id == Integer.MAX_VALUE ? 1 : id + 1;
560 return id;
561 }
562
563 private final class DefaultIoUringIoRegistration implements IoRegistration {
564 private final AtomicBoolean canceled = new AtomicBoolean();
565 private final ThreadAwareExecutor executor;
566 private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, 0L);
567 final IoUringIoHandle handle;
568
569 private boolean removeLater;
570 private int outstandingCompletions;
571 private int id;
572
573 DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
574 this.executor = executor;
575 this.handle = handle;
576 }
577
578 void setId(int id) {
579 this.id = id;
580 }
581
582 @Override
583 public long submit(IoOps ops) {
584 IoUringIoOps ioOps = (IoUringIoOps) ops;
585 if (!isValid()) {
586 return INVALID_ID;
587 }
588 if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
589
590
591 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
592 }
593 long userData = ioOps.userData();
594
595 if (canUseFastPath(userData)) {
596 long packedSeq = UserData.encode(id, ioOps.opcode(), (short) userData);
597 if (executor.isExecutorThread(Thread.currentThread())) {
598 submitFastPath0(ioOps, packedSeq);
599 } else {
600 executor.execute(() -> submitFastPath0(ioOps, packedSeq));
601 }
602 return packedSeq;
603 }
604 long token = pendingOps.nextToken();
605 if (executor.isExecutorThread(Thread.currentThread())) {
606 submitSlowPath0(ioOps, token, userData);
607 } else {
608 executor.execute(() -> submitSlowPath0(ioOps, token, userData));
609 }
610 return token;
611 }
612
613 private void submitFastPath0(IoUringIoOps ioOps, long seq) {
614 ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
615 ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), seq,
616 ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
617 );
618 outstandingCompletions++;
619 }
620
621 private void submitSlowPath0(IoUringIoOps ioOps, long token, long userData) {
622 pendingOps.registerNormal(token, id, ioOps.opcode(), userData);
623 ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
624 ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), token,
625 ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
626 );
627 outstandingCompletions++;
628 }
629
630 private boolean canUseFastPath(long userData) {
631 return ((short) userData) == userData;
632 }
633
634 private int fd() {
635 if (handle instanceof AbstractIoUringChannel) {
636 return ((AbstractIoUringChannel) handle).fd().intValue();
637 }
638 return -1;
639 }
640
641 @SuppressWarnings("unchecked")
642 @Override
643 public <T> T attachment() {
644 return (T) IoUringIoHandler.this;
645 }
646
647 @Override
648 public boolean isValid() {
649 return !canceled.get();
650 }
651
652 @Override
653 public boolean cancel() {
654 if (!canceled.compareAndSet(false, true)) {
655
656 return false;
657 }
658 if (executor.isExecutorThread(Thread.currentThread())) {
659 tryRemove();
660 } else {
661 executor.execute(this::tryRemove);
662 }
663 return true;
664 }
665
666 private void tryRemove() {
667 if (outstandingCompletions > 0) {
668
669
670 removeLater = true;
671 return;
672 }
673 remove();
674 }
675
676 private void remove() {
677 DefaultIoUringIoRegistration old = registrations.remove(id);
678 assert old == this;
679 handle.unregistered();
680 }
681
682 void close() {
683
684
685 assert executor.isExecutorThread(Thread.currentThread());
686 try {
687 handle.close();
688 } catch (Exception e) {
689 logger.debug("Exception during closing " + handle, e);
690 }
691 }
692
693 void handle(int res, int flags, byte op, long userData, ByteBuffer extraCqeData) {
694 event.update(res, flags, op, userData, extraCqeData);
695 handle.handle(this, event);
696
697
698 if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
699
700 removeLater = false;
701 remove();
702 }
703 }
704 }
705
706 private static IoUringIoHandle cast(IoHandle handle) {
707 if (handle instanceof IoUringIoHandle) {
708 return (IoUringIoHandle) handle;
709 }
710 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
711 }
712
713 @Override
714 public void wakeup() {
715 if (!executor.isExecutorThread(Thread.currentThread()) &&
716 !eventfdAsyncNotify.getAndSet(true)) {
717
718
719
720
721 int s;
722 do {
723 s = wakeupWriters.get();
724 if ((s & WAKEUP_CLOSED) != 0) {
725 return;
726 }
727 } while (!wakeupWriters.compareAndSet(s, s + 1));
728 try {
729
730 Native.eventFdWrite(eventfd.intValue(), 1L);
731 } finally {
732 wakeupWriters.decrementAndGet();
733 }
734 }
735 }
736
737 private void closeWakeupGate() {
738 int s;
739 do {
740 s = wakeupWriters.get();
741 } while (!wakeupWriters.compareAndSet(s, s | WAKEUP_CLOSED));
742
743
744 while ((wakeupWriters.get() & ~WAKEUP_CLOSED) != 0) {
745 Thread.onSpinWait();
746 }
747 }
748
749 @Override
750 public boolean isCompatible(Class<? extends IoHandle> handleType) {
751 return IoUringIoHandle.class.isAssignableFrom(handleType);
752 }
753
754 IovArray iovArray() {
755 if (iovArray.isFull()) {
756
757 submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
758 assert iovArray.count() == 0;
759 }
760 return iovArray;
761 }
762
763 MsgHdrMemoryArray msgHdrMemoryArray() {
764 if (msgHdrMemoryArray.isFull()) {
765
766 submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
767 }
768 return msgHdrMemoryArray;
769 }
770
771
772
773
774 byte[] inet4AddressArray() {
775 return inet4AddressArray;
776 }
777
778
779
780
781 byte[] inet6AddressArray() {
782 return inet6AddressArray;
783 }
784
785
786
787
788
789
790 public static IoHandlerFactory newFactory() {
791 return newFactory(new IoUringIoHandlerConfig());
792 }
793
794
795
796
797
798
799
800
801 public static IoHandlerFactory newFactory(int ringSize) {
802 IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
803 configuration.setRingSize(ringSize);
804 return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
805 }
806
807
808
809
810
811
812
813 public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
814 IoUring.ensureAvailability();
815 final IoUringIoHandlerConfig copy = ObjectUtil.checkNotNull(config, "config").verifyAndClone();
816 return new IoHandlerFactory() {
817 @Override
818 public IoHandler newHandler(ThreadAwareExecutor eventLoop) {
819 return new IoUringIoHandler(eventLoop, copy);
820 }
821
822 @Override
823 public boolean isChangingThreadSupported() {
824 return !copy.singleIssuer();
825 }
826 };
827 }
828 }