1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.channel.IoHandle;
19 import io.netty.channel.IoHandler;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandlerFactory;
22 import io.netty.channel.IoOps;
23 import io.netty.channel.IoRegistration;
24 import io.netty.channel.unix.Buffer;
25 import io.netty.channel.unix.Errors;
26 import io.netty.channel.unix.FileDescriptor;
27 import io.netty.channel.unix.IovArray;
28 import io.netty.util.collection.IntObjectHashMap;
29 import io.netty.util.collection.IntObjectMap;
30 import io.netty.util.concurrent.ThreadAwareExecutor;
31 import io.netty.util.internal.CleanableDirectBuffer;
32 import io.netty.util.internal.ObjectUtil;
33 import io.netty.util.internal.StringUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.io.IOException;
38 import java.io.UncheckedIOException;
39 import java.nio.ByteBuffer;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.List;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45
46 import static java.lang.Math.max;
47 import static java.lang.Math.min;
48 import static java.util.Objects.requireNonNull;
49
50
51
52
53 public final class IoUringIoHandler implements IoHandler {
54 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
55
56 private final RingBuffer ringBuffer;
57 private final IntObjectMap<IoUringBufferRing> registeredIoUringBufferRing;
58 private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
59
60 private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
61 private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
62
63 private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
64 private final FileDescriptor eventfd;
65 private final CleanableDirectBuffer eventfdReadBufCleanable;
66 private final ByteBuffer eventfdReadBuf;
67 private final long eventfdReadBufAddress;
68 private final CleanableDirectBuffer timeoutMemoryCleanable;
69 private final ByteBuffer timeoutMemory;
70 private final long timeoutMemoryAddress;
71 private final IovArray iovArray;
72 private final MsgHdrMemoryArray msgHdrMemoryArray;
73 private long eventfdReadSubmitted;
74 private boolean eventFdClosing;
75 private volatile boolean shuttingDown;
76 private boolean closeCompleted;
77 private int nextRegistrationId = Integer.MIN_VALUE;
78
79
80 private static final int EVENTFD_ID = Integer.MAX_VALUE;
81 private static final int RINGFD_ID = EVENTFD_ID - 1;
82 private static final int INVALID_ID = 0;
83
84 private static final int KERNEL_TIMESPEC_SIZE = 16;
85
86 private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
87 private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
88
89 private final ThreadAwareExecutor executor;
90
91 IoUringIoHandler(ThreadAwareExecutor executor, IoUringIoHandlerConfig config) {
92
93 IoUring.ensureAvailability();
94 this.executor = requireNonNull(executor, "executor");
95 requireNonNull(config, "config");
96 int setupFlags = Native.setupFlags(config.singleIssuer());
97
98
99
100 int cqSize = 2 * config.getRingSize();
101 if (config.needSetupCqeSize()) {
102 assert IoUring.isSetupCqeSizeSupported();
103 setupFlags |= Native.IORING_SETUP_CQSIZE;
104 cqSize = config.getCqSize();
105 }
106 this.ringBuffer = Native.createRingBuffer(config.getRingSize(), cqSize, setupFlags);
107 if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
108 int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
109 int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
110 int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
111 if (result < 0) {
112
113 ringBuffer.close();
114 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
115 }
116 }
117
118 registeredIoUringBufferRing = new IntObjectHashMap<>();
119 Collection<IoUringBufferRingConfig> bufferRingConfigs = config.getInternBufferRingConfigs();
120 if (bufferRingConfigs != null && !bufferRingConfigs.isEmpty()) {
121 for (IoUringBufferRingConfig bufferRingConfig : bufferRingConfigs) {
122 try {
123 IoUringBufferRing ring = newBufferRing(ringBuffer.fd(), bufferRingConfig);
124 registeredIoUringBufferRing.put(bufferRingConfig.bufferGroupId(), ring);
125 } catch (Errors.NativeIoException e) {
126 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
127 bufferRing.close();
128 }
129
130 ringBuffer.close();
131 throw new UncheckedIOException(e);
132 }
133 }
134 }
135
136 registrations = new IntObjectHashMap<>();
137 eventfd = Native.newBlockingEventFd();
138 eventfdReadBufCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
139 eventfdReadBuf = eventfdReadBufCleanable.buffer();
140 eventfdReadBufAddress = Buffer.memoryAddress(eventfdReadBuf);
141 timeoutMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(KERNEL_TIMESPEC_SIZE);
142 timeoutMemory = timeoutMemoryCleanable.buffer();
143 timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
144 iovArray = new IovArray(IoUring.NUM_ELEMENTS_IOVEC);
145 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1024);
146 }
147
148 @Override
149 public void initialize() {
150 ringBuffer.enable();
151
152 for (IoUringBufferRing bufferRing : registeredIoUringBufferRing.values()) {
153 bufferRing.initialize();
154 }
155 }
156
157 @Override
158 public int run(IoHandlerContext context) {
159 if (closeCompleted) {
160 if (context.shouldReportActiveIoTime()) {
161 context.reportActiveIoTime(0);
162 }
163 return 0;
164 }
165 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
166 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
167 if (!completionQueue.hasCompletions() && context.canBlock()) {
168 if (eventfdReadSubmitted == 0) {
169 submitEventFdRead();
170 }
171 long timeoutNanos = context.deadlineNanos() == -1 ? -1 : context.delayNanos(System.nanoTime());
172 submitAndWaitWithTimeout(submissionQueue, false, timeoutNanos);
173 } else {
174
175 submitAndClearNow(submissionQueue);
176 }
177
178 int processed;
179 if (context.shouldReportActiveIoTime()) {
180
181 long activeIoStartTimeNanos = System.nanoTime();
182 processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
183 long activeIoEndTimeNanos = System.nanoTime();
184 context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
185 } else {
186 processed = processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
187 }
188 return processed;
189 }
190
191 private boolean needSubmit(int sqFlags) {
192 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
193 return submissionQueue.count() > 0
194 || (sqFlags & (Native.IORING_SQ_CQ_OVERFLOW | Native.IORING_SQ_TASKRUN)) != 0;
195 }
196
197 private int processCompletionsAndHandleOverflow(SubmissionQueue submissionQueue, CompletionQueue completionQueue,
198 CompletionCallback callback) {
199 int processed = 0;
200
201
202 for (int i = 0; i < 128; i++) {
203 int p = completionQueue.process(callback);
204 int sqFlags = submissionQueue.flags();
205 if ((sqFlags & Native.IORING_SQ_CQ_OVERFLOW) != 0) {
206 logger.warn("CompletionQueue overflow detected, consider increasing size: {} ",
207 completionQueue.ringEntries);
208 }
209 if (p == 0) {
210 if (!needSubmit(sqFlags)) {
211 break;
212 }
213 submitAndClearNow0(submissionQueue);
214 }
215 processed += p;
216 }
217 return processed;
218 }
219
220 private int submitAndClearNow(SubmissionQueue submissionQueue) {
221 if (needSubmit(submissionQueue.flags())) {
222 return submitAndClearNow0(submissionQueue);
223 }
224 return 0;
225 }
226
227 private int submitAndClearNow0(SubmissionQueue submissionQueue) {
228
229 int submitted = submissionQueue.submitAndGetNow();
230
231
232
233 iovArray.clear();
234 msgHdrMemoryArray.clear();
235 return submitted;
236 }
237
238 private static IoUringBufferRing newBufferRing(int ringFd, IoUringBufferRingConfig bufferRingConfig)
239 throws Errors.NativeIoException {
240 short bufferRingSize = bufferRingConfig.bufferRingSize();
241 short bufferGroupId = bufferRingConfig.bufferGroupId();
242 int flags = bufferRingConfig.isIncremental() ? Native.IOU_PBUF_RING_INC : 0;
243 long ioUringBufRingAddr = Native.ioUringRegisterBufRing(ringFd, bufferRingSize, bufferGroupId, flags);
244 if (ioUringBufRingAddr < 0) {
245 throw Errors.newIOException("ioUringRegisterBufRing", (int) ioUringBufRingAddr);
246 }
247 return new IoUringBufferRing(ringFd,
248 Buffer.wrapMemoryAddressWithNativeOrder(ioUringBufRingAddr, Native.ioUringBufRingSize(bufferRingSize)),
249 bufferRingSize, bufferRingConfig.batchSize(),
250 bufferGroupId, bufferRingConfig.isIncremental(), bufferRingConfig.allocator(),
251 bufferRingConfig.isBatchAllocation()
252 );
253 }
254
255 IoUringBufferRing findBufferRing(short bgId) {
256 IoUringBufferRing cached = registeredIoUringBufferRing.get(bgId);
257 if (cached != null) {
258 return cached;
259 }
260 throw new IllegalArgumentException(
261 String.format("Cant find bgId:%d, please register it in ioUringIoHandler", bgId)
262 );
263 }
264
265 private static void handleLoopException(Throwable throwable) {
266 logger.warn("Unexpected exception in the IO event loop.", throwable);
267
268
269
270 try {
271 Thread.sleep(100);
272 } catch (InterruptedException ignore) {
273
274 }
275 }
276
277 private void handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
278 try {
279 int id = UserData.decodeId(udata);
280 byte op = UserData.decodeOp(udata);
281 short data = UserData.decodeData(udata);
282
283 if (logger.isTraceEnabled()) {
284 logger.trace("completed(ring {}): {}(id={}, res={})",
285 ringBuffer.fd(), Native.opToStr(op), data, res);
286 }
287 if (id == EVENTFD_ID) {
288 handleEventFdRead();
289 return;
290 }
291 if (id == RINGFD_ID) {
292
293 return;
294 }
295 DefaultIoUringIoRegistration registration = registrations.get(id);
296 if (registration == null) {
297 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
298 Native.opToStr(op), id, res);
299 return;
300 }
301 registration.handle(res, flags, op, data, extraCqeData);
302 } catch (Error e) {
303 throw e;
304 } catch (Throwable throwable) {
305 handleLoopException(throwable);
306 }
307 }
308
309 private void handleEventFdRead() {
310 eventfdReadSubmitted = 0;
311 if (!eventFdClosing) {
312 eventfdAsyncNotify.set(false);
313 submitEventFdRead();
314 }
315 }
316
317 private void submitEventFdRead() {
318 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
319 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
320
321 eventfdReadSubmitted = submissionQueue.addEventFdRead(
322 eventfd.intValue(), eventfdReadBufAddress, 0, 8, udata);
323 }
324
325 private int submitAndWaitWithTimeout(SubmissionQueue submissionQueue,
326 boolean linkTimeout, long timeoutNanoSeconds) {
327 if (timeoutNanoSeconds != -1) {
328 long udata = UserData.encode(RINGFD_ID,
329 linkTimeout ? Native.IORING_OP_LINK_TIMEOUT : Native.IORING_OP_TIMEOUT, (short) 0);
330
331
332 long seconds, nanoSeconds;
333 if (timeoutNanoSeconds == 0) {
334 seconds = 0;
335 nanoSeconds = 0;
336 } else {
337 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
338 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
339 }
340
341 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
342 timeoutMemory.putLong(KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
343 if (linkTimeout) {
344 submissionQueue.addLinkTimeout(timeoutMemoryAddress, udata);
345 } else {
346 submissionQueue.addTimeout(timeoutMemoryAddress, udata);
347 }
348 }
349 int submitted = submissionQueue.submitAndGet();
350
351
352 iovArray.clear();
353 msgHdrMemoryArray.clear();
354 return submitted;
355 }
356
357 @Override
358 public void prepareToDestroy() {
359 shuttingDown = true;
360 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
361 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
362
363 List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
364
365 for (DefaultIoUringIoRegistration registration: copy) {
366 registration.close();
367 }
368
369
370 Native.eventFdWrite(eventfd.intValue(), 1L);
371
372
373 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
374 submissionQueue.addNop((byte) Native.IOSQE_IO_DRAIN, udata);
375
376
377 submissionQueue.submitAndGet();
378
379 while (completionQueue.hasCompletions()) {
380 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, this::handle);
381 if (submissionQueue.count() > 0) {
382 submissionQueue.submitAndGetNow();
383 }
384 }
385 }
386
387 @Override
388 public void destroy() {
389 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
390 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
391 drainEventFd();
392 if (submissionQueue.remaining() < 2) {
393
394
395 submissionQueue.submit();
396 }
397
398 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
399
400
401
402
403
404 submissionQueue.addNop((byte) (Native.IOSQE_IO_DRAIN | Native.IOSQE_LINK), udata);
405
406 submitAndWaitWithTimeout(submissionQueue, true, TimeUnit.MILLISECONDS.toNanos(200));
407 completionQueue.process(this::handle);
408 for (IoUringBufferRing ioUringBufferRing : registeredIoUringBufferRing.values()) {
409 ioUringBufferRing.close();
410 }
411 completeRingClose();
412 }
413
414
415
416
417
418 private void drainEventFd() {
419 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
420 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
421 assert !eventFdClosing;
422 eventFdClosing = true;
423 boolean eventPending = eventfdAsyncNotify.getAndSet(true);
424 if (eventPending) {
425
426
427 while (eventfdReadSubmitted == 0) {
428 submitEventFdRead();
429 submissionQueue.submit();
430 }
431
432 class DrainFdEventCallback implements CompletionCallback {
433 boolean eventFdDrained;
434
435 @Override
436 public void handle(int res, int flags, long udata, ByteBuffer extraCqeData) {
437 if (UserData.decodeId(udata) == EVENTFD_ID) {
438 eventFdDrained = true;
439 }
440 IoUringIoHandler.this.handle(res, flags, udata, extraCqeData);
441 }
442 }
443 final DrainFdEventCallback handler = new DrainFdEventCallback();
444 completionQueue.process(handler);
445 while (!handler.eventFdDrained) {
446 submissionQueue.submitAndGet();
447 processCompletionsAndHandleOverflow(submissionQueue, completionQueue, handler);
448 }
449 }
450
451
452
453 if (eventfdReadSubmitted != 0) {
454 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
455 submissionQueue.addCancel(eventfdReadSubmitted, udata);
456 eventfdReadSubmitted = 0;
457 submissionQueue.submit();
458 }
459 }
460
461 private void completeRingClose() {
462 if (closeCompleted) {
463
464 return;
465 }
466 closeCompleted = true;
467 ringBuffer.close();
468 try {
469 eventfd.close();
470 } catch (IOException e) {
471 logger.warn("Failed to close eventfd", e);
472 }
473 eventfdReadBufCleanable.clean();
474 timeoutMemoryCleanable.clean();
475 iovArray.release();
476 msgHdrMemoryArray.release();
477 }
478
479 @Override
480 public IoRegistration register(IoHandle handle) throws Exception {
481 IoUringIoHandle ioHandle = cast(handle);
482 if (shuttingDown) {
483 throw new IllegalStateException("IoUringIoHandler is shutting down");
484 }
485 DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(executor, ioHandle);
486 for (;;) {
487 int id = nextRegistrationId();
488 DefaultIoUringIoRegistration old = registrations.put(id, registration);
489 if (old != null) {
490 assert old.handle != registration.handle;
491 registrations.put(id, old);
492 } else {
493 registration.setId(id);
494 ioHandle.registered();
495 break;
496 }
497 }
498
499 return registration;
500 }
501
502 private int nextRegistrationId() {
503 int id;
504 do {
505 id = nextRegistrationId++;
506 } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
507 return id;
508 }
509
510 private final class DefaultIoUringIoRegistration implements IoRegistration {
511 private final AtomicBoolean canceled = new AtomicBoolean();
512 private final ThreadAwareExecutor executor;
513 private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
514 final IoUringIoHandle handle;
515
516 private boolean removeLater;
517 private int outstandingCompletions;
518 private int id;
519
520 DefaultIoUringIoRegistration(ThreadAwareExecutor executor, IoUringIoHandle handle) {
521 this.executor = executor;
522 this.handle = handle;
523 }
524
525 void setId(int id) {
526 this.id = id;
527 }
528
529 @Override
530 public long submit(IoOps ops) {
531 IoUringIoOps ioOps = (IoUringIoOps) ops;
532 if (!isValid()) {
533 return INVALID_ID;
534 }
535 if ((ioOps.flags() & Native.IOSQE_CQE_SKIP_SUCCESS) != 0) {
536
537
538 throw new IllegalArgumentException("IOSQE_CQE_SKIP_SUCCESS not supported");
539 }
540 long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
541 if (executor.isExecutorThread(Thread.currentThread())) {
542 submit0(ioOps, udata);
543 } else {
544 executor.execute(() -> submit0(ioOps, udata));
545 }
546 return udata;
547 }
548
549 private void submit0(IoUringIoOps ioOps, long udata) {
550 ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
551 ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
552 ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
553 );
554 outstandingCompletions++;
555 }
556
557 @SuppressWarnings("unchecked")
558 @Override
559 public <T> T attachment() {
560 return (T) IoUringIoHandler.this;
561 }
562
563 @Override
564 public boolean isValid() {
565 return !canceled.get();
566 }
567
568 @Override
569 public boolean cancel() {
570 if (!canceled.compareAndSet(false, true)) {
571
572 return false;
573 }
574 if (executor.isExecutorThread(Thread.currentThread())) {
575 tryRemove();
576 } else {
577 executor.execute(this::tryRemove);
578 }
579 return true;
580 }
581
582 private void tryRemove() {
583 if (outstandingCompletions > 0) {
584
585
586 removeLater = true;
587 return;
588 }
589 remove();
590 }
591
592 private void remove() {
593 DefaultIoUringIoRegistration old = registrations.remove(id);
594 assert old == this;
595 handle.unregistered();
596 }
597
598 void close() {
599
600
601 assert executor.isExecutorThread(Thread.currentThread());
602 try {
603 handle.close();
604 } catch (Exception e) {
605 logger.debug("Exception during closing " + handle, e);
606 }
607 }
608
609 void handle(int res, int flags, byte op, short data, ByteBuffer extraCqeData) {
610 event.update(res, flags, op, data, extraCqeData);
611 handle.handle(this, event);
612
613
614 if ((flags & Native.IORING_CQE_F_MORE) == 0 && --outstandingCompletions == 0 && removeLater) {
615
616 removeLater = false;
617 remove();
618 }
619 }
620 }
621
622 private static IoUringIoHandle cast(IoHandle handle) {
623 if (handle instanceof IoUringIoHandle) {
624 return (IoUringIoHandle) handle;
625 }
626 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
627 }
628
629 @Override
630 public void wakeup() {
631 if (!executor.isExecutorThread(Thread.currentThread()) &&
632 !eventfdAsyncNotify.getAndSet(true)) {
633
634 Native.eventFdWrite(eventfd.intValue(), 1L);
635 }
636 }
637
638 @Override
639 public boolean isCompatible(Class<? extends IoHandle> handleType) {
640 return IoUringIoHandle.class.isAssignableFrom(handleType);
641 }
642
643 IovArray iovArray() {
644 if (iovArray.isFull()) {
645
646 submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
647 }
648 assert iovArray.count() == 0;
649 return iovArray;
650 }
651
652 MsgHdrMemoryArray msgHdrMemoryArray() {
653 if (msgHdrMemoryArray.isFull()) {
654
655 submitAndClearNow(ringBuffer.ioUringSubmissionQueue());
656 }
657 return msgHdrMemoryArray;
658 }
659
660
661
662
663 byte[] inet4AddressArray() {
664 return inet4AddressArray;
665 }
666
667
668
669
670 byte[] inet6AddressArray() {
671 return inet6AddressArray;
672 }
673
674
675
676
677
678
679 public static IoHandlerFactory newFactory() {
680 return newFactory(new IoUringIoHandlerConfig());
681 }
682
683
684
685
686
687
688
689
690 public static IoHandlerFactory newFactory(int ringSize) {
691 IoUringIoHandlerConfig configuration = new IoUringIoHandlerConfig();
692 configuration.setRingSize(ringSize);
693 return eventLoop -> new IoUringIoHandler(eventLoop, configuration);
694 }
695
696
697
698
699
700
701
702 public static IoHandlerFactory newFactory(IoUringIoHandlerConfig config) {
703 IoUring.ensureAvailability();
704 final IoUringIoHandlerConfig copy = ObjectUtil.checkNotNull(config, "config").verifyAndClone();
705 return new IoHandlerFactory() {
706 @Override
707 public IoHandler newHandler(ThreadAwareExecutor eventLoop) {
708 return new IoUringIoHandler(eventLoop, copy);
709 }
710
711 @Override
712 public boolean isChangingThreadSupported() {
713 return !copy.singleIssuer();
714 }
715 };
716 }
717 }