1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.channel.IoEventLoop;
19 import io.netty.channel.IoExecutionContext;
20 import io.netty.channel.IoHandle;
21 import io.netty.channel.IoHandler;
22 import io.netty.channel.IoHandlerFactory;
23 import io.netty.channel.IoOps;
24 import io.netty.channel.IoRegistration;
25 import io.netty.channel.unix.Errors;
26 import io.netty.channel.unix.FileDescriptor;
27 import io.netty.util.collection.IntObjectHashMap;
28 import io.netty.util.collection.IntObjectMap;
29 import io.netty.util.concurrent.Future;
30 import io.netty.util.concurrent.Promise;
31 import io.netty.util.internal.ObjectUtil;
32 import io.netty.util.internal.PlatformDependent;
33 import io.netty.util.internal.StringUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.io.IOException;
38 import java.io.UncheckedIOException;
39 import java.util.ArrayList;
40 import java.util.List;
41 import java.util.concurrent.RejectedExecutionException;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44
45 import static java.util.Objects.requireNonNull;
46
47
48
49
50 public final class IoUringIoHandler implements IoHandler {
51
52
53 static final IoUringIoOps SUBMIT_AND_RUN_ALL = new IoUringIoOps(
54 (byte) -1, (byte) -1, (short) -1, -1, -1, -1, -1, -1, (short) -1, (short) -1, (short) -1, -1, -1);
55
56 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
57 private static final short RING_CLOSE = 1;
58
59 private final RingBuffer ringBuffer;
60 private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
61
62 private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
63 private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
64
65 private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
66 private final FileDescriptor eventfd;
67 private final long eventfdReadBuf;
68
69 private long eventfdReadSubmitted;
70 private boolean eventFdClosing;
71 private volatile boolean shuttingDown;
72 private boolean closeCompleted;
73 private int nextRegistrationId = Integer.MIN_VALUE;
74 private int processedPerRun;
75
76
77 private static final int EVENTFD_ID = Integer.MAX_VALUE;
78 private static final int RINGFD_ID = EVENTFD_ID - 1;
79 private static final int INVALID_ID = 0;
80
81 private final CompletionBuffer completionBuffer;
82
83 IoUringIoHandler(IoUringIoHandlerConfiguration config) {
84
85 IoUring.ensureAvailability();
86 requireNonNull(config, "config");
87 this.ringBuffer = Native.createRingBuffer(config.getRingSize());
88 if (IoUring.isRegisterIowqMaxWorkersSupported() && config.needRegisterIowqMaxWorker()) {
89 int maxBoundedWorker = Math.max(config.getMaxBoundedWorker(), 0);
90 int maxUnboundedWorker = Math.max(config.getMaxUnboundedWorker(), 0);
91 int result = Native.ioUringRegisterIoWqMaxWorkers(ringBuffer.fd(), maxBoundedWorker, maxUnboundedWorker);
92 if (result < 0) {
93
94 ringBuffer.close();
95 throw new UncheckedIOException(Errors.newIOException("io_uring_register", result));
96 }
97 }
98 registrations = new IntObjectHashMap<>();
99 eventfd = Native.newBlockingEventFd();
100 eventfdReadBuf = PlatformDependent.allocateMemory(8);
101
102
103
104 completionBuffer = new CompletionBuffer(ringBuffer.ioUringCompletionQueue().ringSize * 2, 0);
105 }
106
107 @Override
108 public int run(IoExecutionContext context) {
109 processedPerRun = 0;
110 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
111 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
112 if (!completionQueue.hasCompletions() && context.canBlock()) {
113 if (eventfdReadSubmitted == 0) {
114 submitEventFdRead();
115 }
116 if (context.deadlineNanos() != -1) {
117 submitTimeout(context);
118 }
119 submissionQueue.submitAndWait();
120 } else {
121 submissionQueue.submit();
122 }
123
124
125 processedPerRun += drainAndProcessAll(completionQueue, this::handle);
126
127
128 submissionQueue.submit();
129
130 return processedPerRun;
131 }
132
133 private void submitAndRunNow() {
134 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
135 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
136 if (submissionQueue.submit() > 0) {
137 processedPerRun += drainAndProcessAll(completionQueue, this::handle);
138 }
139 }
140
141 private int drainAndProcessAll(CompletionQueue completionQueue, CompletionCallback callback) {
142 int processed = 0;
143 for (;;) {
144 boolean drainedAll = completionBuffer.drain(completionQueue);
145 processed += completionBuffer.processNow(callback);
146 if (drainedAll) {
147 break;
148 }
149 }
150 return processed;
151 }
152
153 private static void handleLoopException(Throwable throwable) {
154 logger.warn("Unexpected exception in the IO event loop.", throwable);
155
156
157
158 try {
159 Thread.sleep(100);
160 } catch (InterruptedException ignore) {
161
162 }
163 }
164
165 private boolean handle(int res, int flags, long udata) {
166 try {
167 int id = UserData.decodeId(udata);
168 byte op = UserData.decodeOp(udata);
169 short data = UserData.decodeData(udata);
170
171 if (logger.isTraceEnabled()) {
172 logger.trace("completed(ring {}): {}(id={}, res={})",
173 ringBuffer.fd(), Native.opToStr(op), data, res);
174 }
175 if (id == EVENTFD_ID) {
176 handleEventFdRead();
177 return true;
178 }
179 if (id == RINGFD_ID) {
180 if (op == Native.IORING_OP_NOP && data == RING_CLOSE) {
181 completeRingClose();
182 }
183 return true;
184 }
185 DefaultIoUringIoRegistration registration = registrations.get(id);
186 if (registration == null) {
187 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
188 Native.opToStr(op), id, res);
189 return true;
190 }
191 registration.handle(res, flags, op, data);
192 return true;
193 } catch (Error e) {
194 throw e;
195 } catch (Throwable throwable) {
196 handleLoopException(throwable);
197 return true;
198 }
199 }
200
201 private void handleEventFdRead() {
202 eventfdReadSubmitted = 0;
203 if (!eventFdClosing) {
204 eventfdAsyncNotify.set(false);
205 submitEventFdRead();
206 }
207 }
208
209 private void submitEventFdRead() {
210 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
211 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_READ, (short) 0);
212
213 eventfdReadSubmitted = submissionQueue.addEventFdRead(
214 eventfd.intValue(), eventfdReadBuf, 0, 8, udata);
215 }
216
217 private void submitTimeout(IoExecutionContext context) {
218 long delayNanos = context.delayNanos(System.nanoTime());
219 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_TIMEOUT, (short) 0);
220
221 ringBuffer.ioUringSubmissionQueue().addTimeout(ringBuffer.fd(), delayNanos, udata);
222 }
223
224 @Override
225 public void prepareToDestroy() {
226 shuttingDown = true;
227 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
228 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
229
230 List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
231
232 for (DefaultIoUringIoRegistration registration: copy) {
233 registration.close();
234 }
235
236
237 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
238 submissionQueue.addNop(ringBuffer.fd(), (byte) Native.IOSQE_IO_DRAIN, udata);
239 submissionQueue.submit();
240 while (completionQueue.hasCompletions()) {
241 completionQueue.process(this::handle);
242
243 if (submissionQueue.count() > 0) {
244 submissionQueue.submit();
245 }
246 }
247 }
248
249 @Override
250 public void destroy() {
251 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
252 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
253 drainEventFd();
254 if (submissionQueue.remaining() < 2) {
255
256
257 submissionQueue.submit();
258 }
259
260 long udata = UserData.encode(RINGFD_ID, Native.IORING_OP_NOP, (short) 0);
261 submissionQueue.addNop(ringBuffer.fd(), (byte) Native.IOSQE_IO_DRAIN, udata);
262
263 udata = UserData.encode(RINGFD_ID, Native.IORING_OP_LINK_TIMEOUT, (short) 0);
264 submissionQueue.addLinkTimeout(ringBuffer.fd(), TimeUnit.MILLISECONDS.toNanos(200), udata);
265 submissionQueue.submitAndWait();
266 completionQueue.process(this::handle);
267 completeRingClose();
268 }
269
270
271
272
273
274 private void drainEventFd() {
275 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
276 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
277 assert !eventFdClosing;
278 eventFdClosing = true;
279 boolean eventPending = eventfdAsyncNotify.getAndSet(true);
280 if (eventPending) {
281
282
283 while (eventfdReadSubmitted == 0) {
284 submitEventFdRead();
285 submissionQueue.submit();
286 }
287
288 class DrainFdEventCallback implements CompletionCallback {
289 boolean eventFdDrained;
290
291 @Override
292 public boolean handle(int res, int flags, long udata) {
293 if (UserData.decodeId(udata) == EVENTFD_ID) {
294 eventFdDrained = true;
295 }
296 return IoUringIoHandler.this.handle(res, flags, udata);
297 }
298 }
299 final DrainFdEventCallback handler = new DrainFdEventCallback();
300 drainAndProcessAll(completionQueue, handler);
301 completionQueue.process(handler);
302 while (!handler.eventFdDrained) {
303 submissionQueue.submitAndWait();
304 drainAndProcessAll(completionQueue, handler);
305 }
306 }
307
308
309
310 if (eventfdReadSubmitted != 0) {
311 long udata = UserData.encode(EVENTFD_ID, Native.IORING_OP_ASYNC_CANCEL, (short) 0);
312 submissionQueue.addCancel(eventfd.intValue(), eventfdReadSubmitted, udata);
313 eventfdReadSubmitted = 0;
314 submissionQueue.submit();
315 }
316 }
317
318 private void completeRingClose() {
319 if (closeCompleted) {
320
321 return;
322 }
323 closeCompleted = true;
324 ringBuffer.close();
325 try {
326 eventfd.close();
327 } catch (IOException e) {
328 logger.warn("Failed to close eventfd", e);
329 }
330 PlatformDependent.freeMemory(eventfdReadBuf);
331 }
332
333 @Override
334 public IoRegistration register(IoEventLoop eventLoop, IoHandle handle) throws Exception {
335 IoUringIoHandle ioHandle = cast(handle);
336 if (shuttingDown) {
337 throw new RejectedExecutionException("IoEventLoop is shutting down");
338 }
339 DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(eventLoop, ioHandle);
340 for (;;) {
341 int id = nextRegistrationId();
342 DefaultIoUringIoRegistration old = registrations.put(id, registration);
343 if (old != null) {
344 assert old.handle != registration.handle;
345 registrations.put(id, old);
346 } else {
347 registration.setId(id);
348 break;
349 }
350 }
351
352 return registration;
353 }
354
355 private int nextRegistrationId() {
356 int id;
357 do {
358 id = nextRegistrationId++;
359 } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
360 return id;
361 }
362
363 private final class DefaultIoUringIoRegistration implements IoUringIoRegistration {
364 private final Promise<?> cancellationPromise;
365 private final IoEventLoop eventLoop;
366 private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
367 final IoUringIoHandle handle;
368
369 private boolean removeLater;
370 private int outstandingCompletions;
371 private int id;
372
373 DefaultIoUringIoRegistration(IoEventLoop eventLoop, IoUringIoHandle handle) {
374 this.eventLoop = eventLoop;
375 this.handle = handle;
376 this.cancellationPromise = eventLoop.newPromise();
377 }
378
379 void setId(int id) {
380 this.id = id;
381 }
382
383 @Override
384 public long submit(IoOps ops) {
385 IoUringIoOps ioOps = (IoUringIoOps) ops;
386 if (!isValid()) {
387 return INVALID_ID;
388 }
389 long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
390 if (eventLoop.inEventLoop()) {
391 submit0(ioOps, udata);
392 } else {
393 eventLoop.execute(() -> submit0(ioOps, udata));
394 }
395 return udata;
396 }
397
398 private void submit0(IoUringIoOps ioOps, long udata) {
399 if (ioOps == SUBMIT_AND_RUN_ALL) {
400 submitAndRunNow();
401 } else {
402 ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
403 ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
404 ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
405 );
406 outstandingCompletions++;
407 }
408 }
409
410 @Override
411 public IoUringIoHandler ioHandler() {
412 return IoUringIoHandler.this;
413 }
414
415 @Override
416 public void cancel() {
417 if (!cancellationPromise.trySuccess(null)) {
418
419 return;
420 }
421 if (eventLoop.inEventLoop()) {
422 tryRemove();
423 } else {
424 eventLoop.execute(this::tryRemove);
425 }
426 }
427
428 @Override
429 public Future<?> cancelFuture() {
430 return cancellationPromise;
431 }
432
433 private void tryRemove() {
434 if (outstandingCompletions > 0) {
435
436
437 removeLater = true;
438 return;
439 }
440 remove();
441 }
442
443 private void remove() {
444 DefaultIoUringIoRegistration old = registrations.remove(id);
445 assert old == this;
446 }
447
448 void close() {
449
450
451 assert eventLoop.inEventLoop();
452 try {
453 handle.close();
454 } catch (Exception e) {
455 logger.debug("Exception during closing " + handle, e);
456 }
457 }
458
459 void handle(int res, int flags, byte op, short data) {
460 event.update(res, flags, op, data);
461 handle.handle(this, event);
462 if (--outstandingCompletions == 0 && removeLater) {
463
464 removeLater = false;
465 remove();
466 }
467 }
468 }
469
470 private static IoUringIoHandle cast(IoHandle handle) {
471 if (handle instanceof IoUringIoHandle) {
472 return (IoUringIoHandle) handle;
473 }
474 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
475 }
476
477 @Override
478 public void wakeup(IoEventLoop eventLoop) {
479 if (!eventLoop.inEventLoop() && !eventfdAsyncNotify.getAndSet(true)) {
480
481 Native.eventFdWrite(eventfd.intValue(), 1L);
482 }
483 }
484
485 @Override
486 public boolean isCompatible(Class<? extends IoHandle> handleType) {
487 return IoUringIoHandle.class.isAssignableFrom(handleType);
488 }
489
490
491
492
493 byte[] inet4AddressArray() {
494 return inet4AddressArray;
495 }
496
497
498
499
500 byte[] inet6AddressArray() {
501 return inet6AddressArray;
502 }
503
504
505
506
507
508
509 public static IoHandlerFactory newFactory() {
510 return newFactory(new IoUringIoHandlerConfiguration());
511 }
512
513
514
515
516
517
518
519
520 public static IoHandlerFactory newFactory(int ringSize) {
521 IoUringIoHandlerConfiguration configuration = new IoUringIoHandlerConfiguration();
522 configuration.setRingSize(ringSize);
523 return () -> new IoUringIoHandler(configuration);
524 }
525
526
527
528
529
530
531
532 public static IoHandlerFactory newFactory(IoUringIoHandlerConfiguration config) {
533 IoUring.ensureAvailability();
534 ObjectUtil.checkNotNull(config, "config");
535 return () -> new IoUringIoHandler(config);
536 }
537
538 }