1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.channel.IoEventLoop;
19 import io.netty.channel.IoExecutionContext;
20 import io.netty.channel.IoHandle;
21 import io.netty.channel.IoHandler;
22 import io.netty.channel.IoHandlerFactory;
23 import io.netty.channel.IoOps;
24 import io.netty.channel.IoRegistration;
25 import io.netty.channel.unix.FileDescriptor;
26 import io.netty.util.collection.IntObjectHashMap;
27 import io.netty.util.collection.IntObjectMap;
28 import io.netty.util.concurrent.Future;
29 import io.netty.util.concurrent.Promise;
30 import io.netty.util.internal.ObjectUtil;
31 import io.netty.util.internal.PlatformDependent;
32 import io.netty.util.internal.StringUtil;
33 import io.netty.util.internal.logging.InternalLogger;
34 import io.netty.util.internal.logging.InternalLoggerFactory;
35
36 import java.io.IOException;
37 import java.util.ArrayList;
38 import java.util.List;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42
43 import static java.util.Objects.requireNonNull;
44
45
46
47
48 public final class IoUringIoHandler implements IoHandler {
49 private static final InternalLogger logger = InternalLoggerFactory.getInstance(IoUringIoHandler.class);
50 private static final short RING_CLOSE = 1;
51
52 private final RingBuffer ringBuffer;
53 private final IntObjectMap<DefaultIoUringIoRegistration> registrations;
54
55 private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
56 private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
57
58 private final AtomicBoolean eventfdAsyncNotify = new AtomicBoolean();
59 private final FileDescriptor eventfd;
60 private final long eventfdReadBuf;
61
62 private long eventfdReadSubmitted;
63 private boolean eventFdClosing;
64 private volatile boolean shuttingDown;
65 private boolean closeCompleted;
66 private int nextRegistrationId = Integer.MIN_VALUE;
67
68
69 private static final int EVENTFD_ID = Integer.MAX_VALUE;
70 private static final int RINGFD_ID = EVENTFD_ID - 1;
71
72 IoUringIoHandler(RingBuffer ringBuffer) {
73
74 IoUring.ensureAvailability();
75 this.ringBuffer = requireNonNull(ringBuffer, "ringBuffer");
76 registrations = new IntObjectHashMap<>();
77 eventfd = Native.newBlockingEventFd();
78 eventfdReadBuf = PlatformDependent.allocateMemory(8);
79 }
80
81 @Override
82 public int run(IoExecutionContext context) {
83 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
84 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
85 if (!completionQueue.hasCompletions() && context.canBlock()) {
86 if (eventfdReadSubmitted == 0) {
87 submitEventFdRead();
88 }
89 if (context.deadlineNanos() != -1) {
90 submitTimeout(context);
91 }
92 submissionQueue.submitAndWait();
93 } else {
94 submissionQueue.submit();
95 }
96 return completionQueue.process(this::handle);
97 }
98
99 private void handle(int res, int flags, int id, byte op, short data) {
100 if (id == EVENTFD_ID) {
101 handleEventFdRead();
102 return;
103 }
104 if (id == RINGFD_ID) {
105 if (op == Native.IORING_OP_NOP && data == RING_CLOSE) {
106 completeRingClose();
107 }
108 return;
109 }
110 DefaultIoUringIoRegistration registration = registrations.get(id);
111 if (registration == null) {
112 logger.debug("ignoring {} completion for unknown registration (id={}, res={})",
113 Native.opToStr(op), id, res);
114 return;
115 }
116 registration.handle(res, flags, op, data);
117 }
118
119 private void handleEventFdRead() {
120 eventfdReadSubmitted = 0;
121 if (!eventFdClosing) {
122 eventfdAsyncNotify.set(false);
123 submitEventFdRead();
124 }
125 }
126
127 private void submitEventFdRead() {
128 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
129 eventfdReadSubmitted = submissionQueue.addEventFdRead(
130 eventfd.intValue(), eventfdReadBuf, 0, 8, EVENTFD_ID, (short) 0);
131 }
132
133 private void submitTimeout(IoExecutionContext context) {
134 long delayNanos = context.delayNanos(System.nanoTime());
135 ringBuffer.ioUringSubmissionQueue().addTimeout(
136 ringBuffer.fd(), delayNanos, RINGFD_ID, (short) 0);
137 }
138
139 @Override
140 public void prepareToDestroy() {
141 shuttingDown = true;
142 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
143 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
144
145 List<DefaultIoUringIoRegistration> copy = new ArrayList<>(registrations.values());
146
147 submissionQueue.addNop(ringBuffer.fd(), Native.IOSQE_IO_DRAIN, RINGFD_ID, (short) 0);
148
149 for (DefaultIoUringIoRegistration registration: copy) {
150 registration.close();
151 if (submissionQueue.count() > 0) {
152 submissionQueue.submit();
153 }
154 if (completionQueue.hasCompletions()) {
155 completionQueue.process(this::handle);
156 }
157 }
158 }
159
160 @Override
161 public void destroy() {
162 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
163 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
164 drainEventFd();
165 if (submissionQueue.remaining() < 2) {
166
167
168 submissionQueue.submit();
169 }
170
171 submissionQueue.addNop(ringBuffer.fd(), Native.IOSQE_IO_DRAIN, RINGFD_ID, (short) 0);
172
173 submissionQueue.addLinkTimeout(ringBuffer.fd(), TimeUnit.MILLISECONDS.toNanos(200), RINGFD_ID, (short) 0);
174 submissionQueue.submitAndWait();
175 completionQueue.process(this::handle);
176 completeRingClose();
177 }
178
179
180
181
182
183 private void drainEventFd() {
184 CompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
185 SubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
186 assert !eventFdClosing;
187 eventFdClosing = true;
188 boolean eventPending = eventfdAsyncNotify.getAndSet(true);
189 if (eventPending) {
190
191
192 while (eventfdReadSubmitted == 0) {
193 submitEventFdRead();
194 submissionQueue.submit();
195 }
196
197 class DrainFdEventCallback implements CompletionCallback {
198 boolean eventFdDrained;
199
200 @Override
201 public void handle(int res, int flags, int id, byte op, short data) {
202 if (id == EVENTFD_ID) {
203 eventFdDrained = true;
204 }
205 IoUringIoHandler.this.handle(res, flags, id, op, data);
206 }
207 }
208 final DrainFdEventCallback handler = new DrainFdEventCallback();
209 completionQueue.process(handler);
210 while (!handler.eventFdDrained) {
211 submissionQueue.submitAndWait();
212 completionQueue.process(handler);
213 }
214 }
215
216
217
218 if (eventfdReadSubmitted != 0) {
219 submissionQueue.addCancel(eventfd.intValue(), eventfdReadSubmitted, EVENTFD_ID);
220 eventfdReadSubmitted = 0;
221 submissionQueue.submit();
222 }
223 }
224
225 private void completeRingClose() {
226 if (closeCompleted) {
227
228 return;
229 }
230 closeCompleted = true;
231 ringBuffer.close();
232 try {
233 eventfd.close();
234 } catch (IOException e) {
235 logger.warn("Failed to close eventfd", e);
236 }
237 PlatformDependent.freeMemory(eventfdReadBuf);
238 }
239
240 @Override
241 public IoRegistration register(IoEventLoop eventLoop, IoHandle handle) throws Exception {
242 IoUringIoHandle ioHandle = cast(handle);
243 if (shuttingDown) {
244 throw new RejectedExecutionException("IoEventLoop is shutting down");
245 }
246 DefaultIoUringIoRegistration registration = new DefaultIoUringIoRegistration(eventLoop, ioHandle);
247 for (;;) {
248 int id = nextRegistrationId();
249 DefaultIoUringIoRegistration old = registrations.put(id, registration);
250 if (old != null) {
251 assert old.handle != registration.handle;
252 registrations.put(id, old);
253 } else {
254 registration.setId(id);
255 break;
256 }
257 }
258
259 ringBuffer.ioUringSubmissionQueue().incrementHandledFds();
260 return registration;
261 }
262
263 private int nextRegistrationId() {
264 int id;
265 do {
266 id = nextRegistrationId++;
267 } while (id == RINGFD_ID || id == EVENTFD_ID);
268 return id;
269 }
270
271 private final class DefaultIoUringIoRegistration implements IoUringIoRegistration {
272 private final Promise<?> cancellationPromise;
273 private final IoEventLoop eventLoop;
274 private final IoUringIoEvent event = new IoUringIoEvent(0, 0, (byte) 0, (short) 0);
275 final IoUringIoHandle handle;
276
277 private boolean removeLater;
278 private int outstandingCompletions;
279 private int id;
280
281 DefaultIoUringIoRegistration(IoEventLoop eventLoop, IoUringIoHandle handle) {
282 this.eventLoop = eventLoop;
283 this.handle = handle;
284 this.cancellationPromise = eventLoop.newPromise();
285 }
286
287 void setId(int id) {
288 this.id = id;
289 }
290
291 @Override
292 public long submit(IoOps ops) {
293 IoUringIoOps ioOps = (IoUringIoOps) ops;
294 long udata = UserData.encode(id, ioOps.opcode(), ioOps.data());
295 if (!isValid()) {
296 return udata;
297 }
298 if (eventLoop.inEventLoop()) {
299 submit0(ioOps, udata);
300 } else {
301 eventLoop.execute(() -> submit0(ioOps, udata));
302 }
303 return udata;
304 }
305
306 private void submit0(IoUringIoOps ioOps, long udata) {
307 ringBuffer.ioUringSubmissionQueue().enqueueSqe(ioOps.opcode(), ioOps.flags(), ioOps.ioPrio(),
308 ioOps.rwFlags(), ioOps.fd(), ioOps.bufferAddress(), ioOps.length(), ioOps.offset(), udata);
309 outstandingCompletions++;
310 }
311
312 @Override
313 public IoUringIoHandler ioHandler() {
314 return IoUringIoHandler.this;
315 }
316
317 @Override
318 public void cancel() {
319 if (cancellationPromise.trySuccess(null)) {
320 return;
321 }
322 if (eventLoop.inEventLoop()) {
323 tryRemove();
324 } else {
325 eventLoop.execute(this::tryRemove);
326 }
327 }
328
329 @Override
330 public Future<?> cancelFuture() {
331 return cancellationPromise;
332 }
333
334 private void tryRemove() {
335 if (outstandingCompletions > 0) {
336
337
338 removeLater = true;
339 return;
340 }
341 remove();
342 }
343
344 private void remove() {
345 DefaultIoUringIoRegistration old = registrations.remove(id);
346 assert old == this;
347 ringBuffer.ioUringSubmissionQueue().decrementHandledFds();
348 }
349
350 void close() {
351 assert eventLoop.inEventLoop();
352 try {
353 cancel();
354 } catch (Exception e) {
355 logger.debug("Exception during canceling " + this, e);
356 }
357 try {
358 handle.close();
359 } catch (Exception e) {
360 logger.debug("Exception during closing " + handle, e);
361 }
362 }
363
364 void handle(int res, int flags, byte op, short data) {
365 event.update(res, flags, op, data);
366 handle.handle(this, event);
367 if (--outstandingCompletions == 0 && removeLater) {
368
369 removeLater = false;
370 remove();
371 }
372 }
373 }
374
375 private static IoUringIoHandle cast(IoHandle handle) {
376 if (handle instanceof IoUringIoHandle) {
377 return (IoUringIoHandle) handle;
378 }
379 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
380 }
381
382 @Override
383 public void wakeup(IoEventLoop eventLoop) {
384 if (!eventLoop.inEventLoop() && !eventfdAsyncNotify.getAndSet(true)) {
385
386 Native.eventFdWrite(eventfd.intValue(), 1L);
387 }
388 }
389
390 @Override
391 public boolean isCompatible(Class<? extends IoHandle> handleType) {
392 return IoUringIoHandle.class.isAssignableFrom(handleType);
393 }
394
395
396
397
398 byte[] inet4AddressArray() {
399 return inet4AddressArray;
400 }
401
402
403
404
405 byte[] inet6AddressArray() {
406 return inet6AddressArray;
407 }
408
409
410
411
412
413
414 public static IoHandlerFactory newFactory() {
415 IoUring.ensureAvailability();
416 return () -> new IoUringIoHandler(Native.createRingBuffer());
417 }
418
419
420
421
422
423
424
425
426 public static IoHandlerFactory newFactory(int ringSize) {
427 IoUring.ensureAvailability();
428 ObjectUtil.checkPositive(ringSize, "ringSize");
429 return () -> new IoUringIoHandler(Native.createRingBuffer(ringSize));
430 }
431 }