1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.channel.IoEventLoop;
19 import io.netty.channel.IoExecutionContext;
20 import io.netty.channel.IoHandle;
21 import io.netty.channel.IoHandler;
22 import io.netty.channel.IoHandlerFactory;
23 import io.netty.channel.IoOps;
24 import io.netty.channel.IoRegistration;
25 import io.netty.channel.unix.FileDescriptor;
26 import io.netty.util.collection.IntObjectHashMap;
27 import io.netty.util.collection.IntObjectMap;
28 import io.netty.util.concurrent.Future;
29 import io.netty.util.concurrent.Promise;
30 import io.netty.util.internal.ObjectUtil;
31 import io.netty.util.internal.PlatformDependent;
32 import io.netty.util.internal.StringUtil;
33 import io.netty.util.internal.logging.InternalLogger;
34 import io.netty.util.internal.logging.InternalLoggerFactory;
35
36 import java.io.IOException;
37 import java.util.ArrayList;
38 import java.util.List;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42
43 import static java.util.Objects.requireNonNull;
44
45
46
47
48 public final class IoUringIoHandler implements IoHandler {
49 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
50 private static final short RING_CLOSE = 1;
51
52 private final RingBuffer ringBuffer;
53 private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
54
55 private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
56 private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
57
58 private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
59 private final FileDescriptor eventfd;
60 private final long eventfdReadBuf;
61
62 private long eventfdReadSubmitted;
63 private boolean eventFdClosing;
64 private volatile boolean shuttingDown;
65 private boolean closeCompleted;
66 private int nextRegistrationId = Integer.MIN_VALUE;
67
68
69 private static final int EVENTFD_ID = Integer.MAX_VALUE;
70 private static final int RINGFD_ID = EVENTFD_ID - 1;
71 private static final int INVALID_ID = 0;
72
73 IoUringIoHandler(RingBuffer ringBuffer) {
74
75 IoUring.ensureAvailability();
76 this.ringBuffer = requireNonNull(ringBuffer, "ringBuffer");
77 registrations = new IntObjectHashMap<>();
78 eventfd = Native.newBlockingEventFd();
79 eventfdReadBuf = PlatformDependent.allocateMemory(8);
80 }
81
82 @Override
83 public int run(IoExecutionContext context) {
84 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
85 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
86 if (!completionQueue.hasCompletions() && context.canBlock()) {
87 if (eventfdReadSubmitted == 0) {
88 submitEventFdRead();
89 }
90 if (context.deadlineNanos() != -1) {
91 submitTimeout(context);
92 }
93 submissionQueue.submitAndWait();
94 } else {
95 submissionQueue.submit();
96 }
97 return completionQueue.process(this::handle);
98 }
99
100 private void handle(int res, int flags, int id, byte op, short data) {
101 if (id == EVENTFD_ID) {
102 handleEventFdRead();
103 return;
104 }
105 if (id == RINGFD_ID) {
106 if (op == Native.IORING_OP_NOP && data == RING_CLOSE) {
107 completeRingClose();
108 }
109 return;
110 }
111 DefaultIoUringIoRegistration registration = registrations.get(id);
112 if (registration == null) {
113 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
114 Native.opToStr(op), id, res);
115 return;
116 }
117 registration.handle(res, flags, op, data);
118 }
119
120 private void handleEventFdRead() {
121 eventfdReadSubmitted = 0;
122 if (!eventFdClosing) {
123 eventfdAsyncNotify.set(false);
124 submitEventFdRead();
125 }
126 }
127
128 private void submitEventFdRead() {
129 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
130 eventfdReadSubmitted = submissionQueue.addEventFdRead(
131 eventfd.intValue(), eventfdReadBuf, 0, 8, EVENTFD_ID, (short) 0);
132 }
133
134 private void submitTimeout(IoExecutionContext context) {
135 long delayNanos = context.delayNanos(System.nanoTime());
136 ringBuffer.ioUringSubmissionQueue().addTimeout(
137 ringBuffer.fd(), delayNanos, RINGFD_ID, (short) 0);
138 }
139
140 @Override
141 public void prepareToDestroy() {
142 shuttingDown = true;
143 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
144 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
145
146 List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
147
148 for (DefaultIoUringIoRegistration registration: copy) {
149 registration.close();
150 }
151
152
153 submissionQueue.addNop(ringBuffer.fd(), (byte) Native.IOSQE_IO_DRAIN, RINGFD_ID, (short) 0);
154 submissionQueue.submit();
155 while (completionQueue.hasCompletions()) {
156 completionQueue.process(this::handle);
157
158 if (submissionQueue.count() > 0) {
159 submissionQueue.submit();
160 }
161 }
162 }
163
164 @Override
165 public void destroy() {
166 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
167 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
168 drainEventFd();
169 if (submissionQueue.remaining() < 2) {
170
171
172 submissionQueue.submit();
173 }
174
175 submissionQueue.addNop(ringBuffer.fd(), (byte) Native.IOSQE_IO_DRAIN, RINGFD_ID, (short) 0);
176
177 submissionQueue.addLinkTimeout(ringBuffer.fd(), TimeUnit.MILLISECONDS.toNanos(200), RINGFD_ID, (short) 0);
178 submissionQueue.submitAndWait();
179 completionQueue.process(this::handle);
180 completeRingClose();
181 }
182
183
184
185
186
187 private void drainEventFd() {
188 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
189 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
190 assert !eventFdClosing;
191 eventFdClosing = true;
192 boolean eventPending = eventfdAsyncNotify.getAndSet(true);
193 if (eventPending) {
194
195
196 while (eventfdReadSubmitted == 0) {
197 submitEventFdRead();
198 submissionQueue.submit();
199 }
200
201 class DrainFdEventCallback implements CompletionCallback {
202 boolean eventFdDrained;
203
204 @Override
205 public void handle(int res, int flags, int id, byte op, short data) {
206 if (id == EVENTFD_ID) {
207 eventFdDrained = true;
208 }
209 IoUringIoHandler.this.handle(res, flags, id, op, data);
210 }
211 }
212 final DrainFdEventCallback handler = new DrainFdEventCallback();
213 completionQueue.process(handler);
214 while (!handler.eventFdDrained) {
215 submissionQueue.submitAndWait();
216 completionQueue.process(handler);
217 }
218 }
219
220
221
222 if (eventfdReadSubmitted != 0) {
223 submissionQueue.addCancel(eventfd.intValue(), eventfdReadSubmitted, EVENTFD_ID);
224 eventfdReadSubmitted = 0;
225 submissionQueue.submit();
226 }
227 }
228
229 private void completeRingClose() {
230 if (closeCompleted) {
231
232 return;
233 }
234 closeCompleted = true;
235 ringBuffer.close();
236 try {
237 eventfd.close();
238 } catch (IOException e) {
239 logger.warn("Failed to close eventfd", e);
240 }
241 PlatformDependent.freeMemory(eventfdReadBuf);
242 }
243
244 @Override
245 public IoRegistration register(IoEventLoop eventLoop, IoHandle handle) throws Exception {
246 IoUringIoHandle ioHandle = cast(handle);
247 if (shuttingDown) {
248 throw new RejectedExecutionException("IoEventLoop is shutting down");
249 }
250 DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(eventLoop, ioHandle);
251 for (;;) {
252 int id = nextRegistrationId();
253 DefaultIoUringIoRegistration old = registrations.put(id, registration);
254 if (old != null) {
255 assert old.handle != registration.handle;
256 registrations.put(id, old);
257 } else {
258 registration.setId(id);
259 break;
260 }
261 }
262
263 ringBuffer.ioUringSubmissionQueue().incrementHandledFds();
264 return registration;
265 }
266
267 private int nextRegistrationId() {
268 int id;
269 do {
270 id = nextRegistrationId++;
271 } while (id == RINGFD_ID || id == EVENTFD_ID || id == INVALID_ID);
272 return id;
273 }
274
275 private final class DefaultIoUringIoRegistration implements IoUringIoRegistration {
276 private final Promise<?> cancellationPromise;
277 private final IoEventLoop eventLoop;
278 private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
279 final IoUringIoHandle handle;
280
281 private boolean removeLater;
282 private int outstandingCompletions;
283 private int id;
284
285 DefaultIoUringIoRegistration(IoEventLoop eventLoop, IoUringIoHandle handle) {
286 this.eventLoop = eventLoop;
287 this.handle = handle;
288 this.cancellationPromise = eventLoop.newPromise();
289 }
290
291 void setId(int id) {
292 this.id = id;
293 }
294
295 @Override
296 public long submit(IoOps ops) {
297 IoUringIoOps ioOps = (IoUringIoOps) ops;
298 if (!isValid()) {
299 return INVALID_ID;
300 }
301 long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
302 if (eventLoop.inEventLoop()) {
303 submit0(ioOps, udata);
304 } else {
305 eventLoop.execute(() -> submit0(ioOps, udata));
306 }
307 return udata;
308 }
309
310 private void submit0(IoUringIoOps ioOps, long udata) {
311 ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
312 ioOps.fd(), ioOps.union1(), ioOps.union2(), ioOps.len(), ioOps.union3(), udata,
313 ioOps.union4(), ioOps.personality(), ioOps.union5(), ioOps.union6()
314 );
315 outstandingCompletions++;
316 }
317
318 @Override
319 public IoUringIoHandler ioHandler() {
320 return IoUringIoHandler.this;
321 }
322
323 @Override
324 public void cancel() {
325 if (!cancellationPromise.trySuccess(null)) {
326
327 return;
328 }
329 if (eventLoop.inEventLoop()) {
330 tryRemove();
331 } else {
332 eventLoop.execute(this::tryRemove);
333 }
334 }
335
336 @Override
337 public Future<?> cancelFuture() {
338 return cancellationPromise;
339 }
340
341 private void tryRemove() {
342 if (outstandingCompletions > 0) {
343
344
345 removeLater = true;
346 return;
347 }
348 remove();
349 }
350
351 private void remove() {
352 DefaultIoUringIoRegistration old = registrations.remove(id);
353 assert old == this;
354 ringBuffer.ioUringSubmissionQueue().decrementHandledFds();
355 }
356
357 void close() {
358
359
360 assert eventLoop.inEventLoop();
361 try {
362 handle.close();
363 } catch (Exception e) {
364 logger.debug("Exception during closing " + handle, e);
365 }
366 }
367
368 void handle(int res, int flags, byte op, short data) {
369 event.update(res, flags, op, data);
370 handle.handle(this, event);
371 if (--outstandingCompletions == 0 && removeLater) {
372
373 removeLater = false;
374 remove();
375 }
376 }
377 }
378
379 private static IoUringIoHandle cast(IoHandle handle) {
380 if (handle instanceof IoUringIoHandle) {
381 return (IoUringIoHandle) handle;
382 }
383 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
384 }
385
386 @Override
387 public void wakeup(IoEventLoop eventLoop) {
388 if (!eventLoop.inEventLoop() && !eventfdAsyncNotify.getAndSet(true)) {
389
390 Native.eventFdWrite(eventfd.intValue(), 1L);
391 }
392 }
393
394 @Override
395 public boolean isCompatible(Class<? extends IoHandle> handleType) {
396 return IoUringIoHandle.class.isAssignableFrom(handleType);
397 }
398
399
400
401
402 byte[] inet4AddressArray() {
403 return inet4AddressArray;
404 }
405
406
407
408
409 byte[] inet6AddressArray() {
410 return inet6AddressArray;
411 }
412
413
414
415
416
417
418 public static IoHandlerFactory newFactory() {
419 IoUring.ensureAvailability();
420 return () -> new IoUringIoHandler(Native.createRingBuffer());
421 }
422
423
424
425
426
427
428
429
430 public static IoHandlerFactory newFactory(int ringSize) {
431 IoUring.ensureAvailability();
432 ObjectUtil.checkPositive(ringSize, "ringSize");
433 return () -> new IoUringIoHandler(Native.createRingBuffer(ringSize));
434 }
435 }