1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.channel.IoHandle;
19 import io.netty.channel.IoHandler;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandlerFactory;
22 import io.netty.channel.IoOps;
23 import io.netty.channel.IoRegistration;
24 import io.netty.channel.unix.Buffer;
25 import io.netty.channel.unix.Errors;
26 import io.netty.channel.unix.FileDescriptor;
27 import io.netty.channel.unix.IovArray;
28 import io.netty.util.collection.IntObjectHashMap;
29 import io.netty.util.collection.IntObjectMap;
30 import io.netty.util.concurrent.ThreadAwareExecutor;
31 import io.netty.util.internal.CleanableDirectBuffer;
32 import io.netty.util.internal.ObjectUtil;
33 import io.netty.util.internal.StringUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.io.IOException;
38 import java.io.UncheckedIOException;
39 import java.nio.ByteBuffer;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.List;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45
46 import static java.lang.Math.max;
47 import static java.lang.Math.min;
48 import static java.util.Objects.requireNonNull;
49
50
51
52
53 public final class IoUringIoHandler implements IoHandler {
54 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
55
56 private final RingBuffer ringBuffer;
57 private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
58 private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
59
60 private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
61 private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
62
63 private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
64 private final FileDescriptor eventfd;
65 private final CleanableDirectBuffer eventfdReadBufCleanable;
66 private final ByteBuffer eventfdReadBuf;
67 private final long eventfdReadBufAddress;
68 private final CleanableDirectBuffer timeoutMemoryCleanable;
69 private final ByteBuffer timeoutMemory;
70 private final long timeoutMemoryAddress;
71 private final IovArray iovArray;
72 private final MsgHdrMemoryArray msgHdrMemoryArray;
73 private long eventfdReadSubmitted;
74 private boolean eventFdClosing;
75 private volatile boolean shuttingDown;
76 private boolean closeCompleted;
77 private int nextRegistrationId = Integer.MIN_VALUE;
78
79
80 private static final int EVENTFD_ID = Integer.MAX_VALUE;
81 private static final int RINGFD_ID = EVENTFD_ID - 1;
82 private static final int INVALID_ID = 0;
83
84 private static final int KERNEL_TIMESPEC_SIZE = 16;
85
86 private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
87 private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
88
89 private final ThreadAwareExecutor executor;
90
91 IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
92
93 IoUring.ensureAvailability();
94 this.executor = requireNonNull(executor, "executor");
95 requireNonNull(config, "config");
96 int setupFlags = Native.setupFlags(config.singleIssuer());
97
98
99
100 int cqSize = 2 * config.getRingSize();
101 if (config.needSetupCqeSize()) {
102 if (!IoUring.isSetupCqeSizeSupported()) {
103 throw new UnsupportedOperationException("IORING_SETUP_CQSIZE is not supported");
104 }
105 setupFlags |= Native.IORING_SETUP_CQSIZE;
106 cqSize = config.checkCqSize(config.getCqSize());
107 }
108 this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
109 if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
110 int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
111 int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
112 int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
113 if (result < 0) {
114
115 ringBuffer.close();
116 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
117 }
118 }
119
120 registeredIoUringBufferRing = new IntObjectHashMap<>();
121 Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
122 if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
123 if (!IoUring.isRegisterBufferRingSupported()) {
124
125 ringBuffer.close();
126 throw new UnsupportedOperationException("IORING_REGISTER_PBUF_RING is not supported");
127 }
128 for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
129 try {
130 IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
131 registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
132 } catch (Errors.NativeIoException e) {
133 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
134 bufferRing.close();
135 }
136
137 ringBuffer.close();
138 throw new UncheckedIOException(e);
139 }
140 }
141 }
142
143 registrations = new IntObjectHashMap<>();
144 eventfd = Native.newBlockingEventFd();
145 eventfdReadBufCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
146 eventfdReadBuf = eventfdReadBufCleanable.buffer();
147 eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
148 timeoutMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(KERNEL_TIMESPEC_SIZE);
149 timeoutMemory = timeoutMemoryCleanable.buffer();
150 timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
151 iovArray = new IovArray(IoUring.NUM_ELEMENTS_IOVEC);
152 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1024);
153 }
154
155 @Override
156 public void initialize() {
157 ringBuffer.enable();
158
159 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
160 bufferRing.initialize();
161 }
162 }
163
164 @Override
165 public int run(IoHandlerContext context) {
166 if (closeCompleted) {
167 if (context.shouldReportActiveIoTime()) {
168 context.reportActiveIoTime(0);
169 }
170 return 0;
171 }
172 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
173 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
174 if (!completionQueue.hasCompletions() && context.canBlock()) {
175 if (eventfdReadSubmitted == 0) {
176 submitEventFdRead();
177 }
178 long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
179 submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
180 } else {
181
182 submitAndClearNow(submissionQueue);
183 }
184
185 int processed;
186 if (context.shouldReportActiveIoTime()) {
187
188 long activeIoStartTimeNanos = System.nanoTime();
189 processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
190 long activeIoEndTimeNanos = System.nanoTime();
191 context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
192 } else {
193 processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
194 }
195 return processed;
196 }
197
198 private int processCompletionsAndHandleOverflow(SubmissionQueue submissionQueue, CompletionQueue completionQueue,
199 CompletionCallback callback) {
200 int processed = 0;
201
202
203 for (int i = 0; i < 128; i++) {
204 int p = completionQueue.process(callback);
205 if ((submissionQueue.flags() & Native.IORING_SQ_CQ_OVERFLOW) != 0) {
206 logger.warn("CompletionQueue overflow detected, consider increasing size: {} ",
207 completionQueue.ringEntries);
208 submitAndClearNow(submissionQueue);
209 } else if (p == 0 &&
210
211 (submissionQueue.count() == 0 ||
212
213
214 (submitAndClearNow(submissionQueue) == 0 && !completionQueue.hasCompletions()))) {
215 break;
216 }
217 processed += p;
218 }
219 return processed;
220 }
221
222 private int submitAndClearNow(SubmissionQueue submissionQueue) {
223 int submitted = submissionQueue.submitAndGetNow();
224
225
226
227 iovArray.clear();
228 msgHdrMemoryArray.clear();
229 return submitted;
230 }
231
232 private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
233 throws Errors.NativeIoException {
234 short bufferRingSize = bufferRingConfig.bufferRingSize();
235 short bufferGroupId = bufferRingConfig.bufferGroupId();
236 int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
237 long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
238 if (ioUringBufRingAddr < 0) {
239 throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
240 }
241 return new IoUringBufferRing(ringFd,
242 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
243 bufferRingSize, bufferRingConfig.batchSize(),
244 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator(),
245 bufferRingConfig.isBatchAllocation()
246 );
247 }
248
249 IoUringBufferRing findBufferRing(short bgId) {
250 IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
251 if (cached != null) {
252 return cached;
253 }
254 throw new IllegalArgumentException(
255 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
256 );
257 }
258
259 private static void handleLoopException(Throwable throwable) {
260 logger.warn("Unexpected exception in the IO event loop.", throwable);
261
262
263
264 try {
265 Thread.sleep(100);
266 } catch (InterruptedException ignore) {
267
268 }
269 }
270
271 private boolean handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
272 try {
273 int id = UserData.decodeId(udata);
274 byte op = UserData.decodeOp(udata);
275 short data = UserData.decodeData(udata);
276
277 if (logger.isTraceEnabled()) {
278 logger.trace("completed(ring {}): {}(id={}, res={})",
279 ringBuffer.fd(), Native.opToStr(op), data, res);
280 }
281 if (id == EVENTFD_ID) {
282 handleEventFdRead();
283 return true;
284 }
285 if (id == RINGFD_ID) {
286
287 return true;
288 }
289 DefaultIoUringIoRegistration registration = registrations.get(id);
290 if (registration == null) {
291 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
292 Native.opToStr(op), id, res);
293 return true;
294 }
295 registration.handle(res, flags, op, data, extraCqeData);
296 return true;
297 } catch (Error e) {
298 throw e;
299 } catch (Throwable throwable) {
300 handleLoopException(throwable);
301 return true;
302 }
303 }
304
305 private void handleEventFdRead() {
306 eventfdReadSubmitted = 0;
307 if (!eventFdClosing) {
308 eventfdAsyncNotify.set(false);
309 submitEventFdRead();
310 }
311 }
312
313 private void submitEventFdRead() {
314 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
315 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
316
317 eventfdReadSubmitted = submissionQueue.addEventFdRead(
318 eventfd.intValue(), eventfdReadBufAddress, 0, 8, udata);
319 }
320
321 private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
322 boolean linkTimeout, long timeoutNanoSeconds) {
323 if (timeoutNanoSeconds != -1) {
324 long udata = UserData.encode(RINGFD_ID,
325 linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
326
327
328 long seconds, nanoSeconds;
329 if (timeoutNanoSeconds == 0) {
330 seconds = 0;
331 nanoSeconds = 0;
332 } else {
333 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
334 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
335 }
336
337 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
338 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
339 if (linkTimeout) {
340 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
341 } else {
342 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
343 }
344 }
345 int submitted = submissionQueue.submitAndGet();
346
347
348 iovArray.clear();
349 msgHdrMemoryArray.clear();
350 return submitted;
351 }
352
353 @Override
354 public void prepareToDestroy() {
355 shuttingDown = true;
356 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
357 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
358
359 List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
360
361 for (DefaultIoUringIoRegistration registration: copy) {
362 registration.close();
363 }
364
365
366 Native.eventFdWrite(eventfd.intValue(), 1L);
367
368
369 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
370 submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
371
372
373 submissionQueue.submitAndGet();
374
375 while (completionQueue.hasCompletions()) {
376 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
377 if (submissionQueue.count() > 0) {
378 submissionQueue.submitAndGetNow();
379 }
380 }
381 }
382
383 @Override
384 public void destroy() {
385 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
386 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
387 drainEventFd();
388 if (submissionQueue.remaining() < 2) {
389
390
391 submissionQueue.submit();
392 }
393
394 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
395
396
397
398
399
400 submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
401
402 submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
403 completionQueue.process(this::handle);
404 for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
405 ioUringBufferRing.close();
406 }
407 completeRingClose();
408 }
409
410
411
412
413
414 private void drainEventFd() {
415 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
416 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
417 assert !eventFdClosing;
418 eventFdClosing = true;
419 boolean eventPending = eventfdAsyncNotify.getAndSet(true);
420 if (eventPending) {
421
422
423 while (eventfdReadSubmitted == 0) {
424 submitEventFdRead();
425 submissionQueue.submit();
426 }
427
428 class DrainFdEventCallback implements CompletionCallback {
429 boolean eventFdDrained;
430
431 @Override
432 public boolean handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
433 if (UserData.decodeId(udata) == EVENTFD_ID) {
434 eventFdDrained = true;
435 }
436 return IoUringIoHandler.this.handle(res, flags, udata, extraCqeData);
437 }
438 }
439 final DrainFdEventCallback handler = new DrainFdEventCallback();
440 completionQueue.process(handler);
441 while (!handler.eventFdDrained) {
442 submissionQueue.submitAndGet();
443 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, handler);
444 }
445 }
446
447
448
449 if (eventfdReadSubmitted != 0) {
450 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
451 submissionQueue.addCancel(eventfdReadSubmitted, udata);
452 eventfdReadSubmitted = 0;
453 submissionQueue.submit();
454 }
455 }
456
457 private void completeRingClose() {
458 if (closeCompleted) {
459
460 return;
461 }
462 closeCompleted = true;
463 ringBuffer.close();
464 try {
465 eventfd.close();
466 } catch (IOException e) {
467 logger.warn("Failed to close eventfd", e);
468 }
469 eventfdReadBufCleanable.clean();
470 timeoutMemoryCleanable.clean();
471 iovArray.release();
472 msgHdrMemoryArray.release();
473 }
474
475 @Override
476 public IoRegistration register(IoHandle handle) throws Exception {
477 IoUringIoHandle ioHandle = cast(handle);
478 if (shuttingDown) {
479 throw new IllegalStateException("IoUringIoHandler is shutting down");
480 }
481 DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
482 for (;;) {
483 int id = nextRegistrationId();
484 DefaultIoUringIoRegistration old = registrations.put(id, registration);
485 if (old != null) {
486 assert old.handle != registration.handle;
487 registrations.put(id, old);
488 } else {
489 registration.setId(id);
490 break;
491 }
492 }
493
494 return registration;
495 }
496
497 private int nextRegistrationId() {
498 int id;
499 do {
500 id = nextRegistrationId++;
501 } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
502 return id;
503 }
504
505 private final class DefaultIoUringIoRegistration implements IoRegistration {
506 private final AtomicBoolean canceled = new AtomicBoolean();
507 private final ThreadAwareExecutor executor;
508 private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
509 final IoUringIoHandle handle;
510
511 private boolean removeLater;
512 private int outstandingCompletions;
513 private int id;
514
515 DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
516 this.executor = executor;
517 this.handle = handle;
518 }
519
520 void setId(int id) {
521 this.id = id;
522 }
523
524 @Override
525 public long submit(IoOps ops) {
526 IoUringIoOps ioOps = (IoUringIoOps) ops;
527 if (!isValid()) {
528 return INVALID_ID;
529 }
530 if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
531
532
533 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
534 }
535 long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
536 if (executor.isExecutorThread(Thread.currentThread())) {
537 submit0(ioOps, udata);
538 } else {
539 executor.execute(() -> submit0(ioOps, udata));
540 }
541 return udata;
542 }
543
544 private void submit0(IoUringIoOps ioOps, long udata) {
545 ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
546 ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
547 ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
548 );
549 outstandingCompletions++;
550 }
551
552 @SuppressWarnings("unchecked")
553 @Override
554 public <T> T attachment() {
555 return (T) IoUringIoHandler.this;
556 }
557
558 @Override
559 public boolean isValid() {
560 return !canceled.get();
561 }
562
563 @Override
564 public boolean cancel() {
565 if (!canceled.compareAndSet(false, true)) {
566
567 return false;
568 }
569 if (executor.isExecutorThread(Thread.currentThread())) {
570 tryRemove();
571 } else {
572 executor.execute(this::tryRemove);
573 }
574 return true;
575 }
576
577 private void tryRemove() {
578 if (outstandingCompletions > 0) {
579
580
581 removeLater = true;
582 return;
583 }
584 remove();
585 }
586
587 private void remove() {
588 DefaultIoUringIoRegistration old = registrations.remove(id);
589 assert old == this;
590 }
591
592 void close() {
593
594
595 assert executor.isExecutorThread(Thread.currentThread());
596 try {
597 handle.close();
598 } catch (Exception e) {
599 logger.debug("Exception during closing " + handle, e);
600 }
601 }
602
603 void handle(int res, int flags, byte op, short data, ByteBuffer extraCqeData) {
604 event.update(res, flags, op, data, extraCqeData);
605 handle.handle(this, event);
606
607
608 if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
609
610 removeLater = false;
611 remove();
612 }
613 }
614 }
615
616 private static IoUringIoHandle cast(IoHandle handle) {
617 if (handle instanceof IoUringIoHandle) {
618 return (IoUringIoHandle) handle;
619 }
620 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
621 }
622
623 @Override
624 public void wakeup() {
625 if (!executor.isExecutorThread(Thread.currentThread()) &&
626 !eventfdAsyncNotify.getAndSet(true)) {
627
628 Native.eventFdWrite(eventfd.intValue(), 1L);
629 }
630 }
631
632 @Override
633 public boolean isCompatible(Class<? extends IoHandle> handleType) {
634 return IoUringIoHandle.class.isAssignableFrom(handleType);
635 }
636
637 IovArray iovArray() {
638 if (iovArray.isFull()) {
639
640 submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
641 }
642 assert iovArray.count() == 0;
643 return iovArray;
644 }
645
646 MsgHdrMemoryArray msgHdrMemoryArray() {
647 if (msgHdrMemoryArray.isFull()) {
648
649 submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
650 }
651 return msgHdrMemoryArray;
652 }
653
654
655
656
657 byte[] inet4AddressArray() {
658 return inet4AddressArray;
659 }
660
661
662
663
664 byte[] inet6AddressArray() {
665 return inet6AddressArray;
666 }
667
668
669
670
671
672
673 public static IoHandlerFactory newFactory() {
674 return newFactory(new IoUringIoHandlerConfig());
675 }
676
677
678
679
680
681
682
683
684 public static IoHandlerFactory newFactory(int ringSize) {
685 IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
686 configuration.setRingSize(ringSize);
687 return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
688 }
689
690
691
692
693
694
695
696 public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
697 IoUring.ensureAvailability();
698 ObjectUtil.checkNotNull(config, "config");
699 return new IoHandlerFactory() {
700 @Override
701 public IoHandler newHandler(ThreadAwareExecutor eventLoop) {
702 return new IoUringIoHandler(eventLoop, config);
703 }
704
705 @Override
706 public boolean isChangingThreadSupported() {
707 return !config.singleIssuer();
708 }
709 };
710 }
711 }