1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.DefaultFileRegion;
27 import io.netty.channel.EventLoop;
28 import io.netty.channel.IoRegistration;
29 import io.netty.channel.socket.DuplexChannel;
30 import io.netty.channel.unix.IovArray;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.io.IOException;
35 import java.net.SocketAddress;
36
37 import static io.netty.channel.unix.Errors.ioResult;
38
39 abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
40 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
41 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
42
43
44 byte writeOpCode;
45
46 long writeId;
47 byte readOpCode;
48 long readId;
49
50
51 private IoUringBufferRing bufferRing;
52
53 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
54 super(parent, socket, active);
55 }
56
57 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
58 super(parent, socket, remote);
59 }
60
61 @Override
62 public ChannelMetadata metadata() {
63 return METADATA;
64 }
65
66 @Override
67 protected AbstractUringUnsafe newUnsafe() {
68 return new IoUringStreamUnsafe();
69 }
70
71 @Override
72 public final ChannelFuture shutdown() {
73 return shutdown(newPromise());
74 }
75
76 @Override
77 public final ChannelFuture shutdown(final ChannelPromise promise) {
78 ChannelFuture shutdownOutputFuture = shutdownOutput();
79 if (shutdownOutputFuture.isDone()) {
80 shutdownOutputDone(shutdownOutputFuture, promise);
81 } else {
82 shutdownOutputFuture.addListener(new ChannelFutureListener() {
83 @Override
84 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
85 shutdownOutputDone(shutdownOutputFuture, promise);
86 }
87 });
88 }
89 return promise;
90 }
91
92 @Override
93 protected final void doShutdownOutput() throws Exception {
94 socket.shutdown(false, true);
95 }
96
97 private void shutdownInput0(final ChannelPromise promise) {
98 try {
99 socket.shutdown(true, false);
100 promise.setSuccess();
101 } catch (Throwable cause) {
102 promise.setFailure(cause);
103 }
104 }
105
106 @Override
107 public final boolean isOutputShutdown() {
108 return socket.isOutputShutdown();
109 }
110
111 @Override
112 public final boolean isInputShutdown() {
113 return socket.isInputShutdown();
114 }
115
116 @Override
117 public final boolean isShutdown() {
118 return socket.isShutdown();
119 }
120
121 @Override
122 public final ChannelFuture shutdownOutput() {
123 return shutdownOutput(newPromise());
124 }
125
126 @Override
127 public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
128 EventLoop loop = eventLoop();
129 if (loop.inEventLoop()) {
130 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
131 } else {
132 loop.execute(new Runnable() {
133 @Override
134 public void run() {
135 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
136 }
137 });
138 }
139
140 return promise;
141 }
142
143 @Override
144 public final ChannelFuture shutdownInput() {
145 return shutdownInput(newPromise());
146 }
147
148 @Override
149 public final ChannelFuture shutdownInput(final ChannelPromise promise) {
150 EventLoop loop = eventLoop();
151 if (loop.inEventLoop()) {
152 shutdownInput0(promise);
153 } else {
154 loop.execute(new Runnable() {
155 @Override
156 public void run() {
157 shutdownInput0(promise);
158 }
159 });
160 }
161 return promise;
162 }
163
164 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
165 ChannelFuture shutdownInputFuture = shutdownInput();
166 if (shutdownInputFuture.isDone()) {
167 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
168 } else {
169 shutdownInputFuture.addListener(new ChannelFutureListener() {
170 @Override
171 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
172 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
173 }
174 });
175 }
176 }
177
178 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
179 ChannelFuture shutdownInputFuture,
180 ChannelPromise promise) {
181 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
182 Throwable shutdownInputCause = shutdownInputFuture.cause();
183 if (shutdownOutputCause != null) {
184 if (shutdownInputCause != null) {
185 logger.info("Exception suppressed because a previous exception occurred.",
186 shutdownInputCause);
187 }
188 promise.setFailure(shutdownOutputCause);
189 } else if (shutdownInputCause != null) {
190 promise.setFailure(shutdownInputCause);
191 } else {
192 promise.setSuccess();
193 }
194 }
195
196 @Override
197 protected final void doRegister(ChannelPromise promise) {
198 ChannelPromise registerPromise = this.newPromise();
199
200 registerPromise.addListener(f -> {
201 if (f.isSuccess()) {
202 try {
203 short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
204 if (bgid >= 0) {
205 final IoUringIoHandler ioUringIoHandler = registration().attachment();
206 bufferRing = ioUringIoHandler.findBufferRing(bgid);
207 }
208 if (active) {
209
210 schedulePollRdHup();
211 }
212 } finally {
213 promise.setSuccess();
214 }
215 } else {
216 promise.setFailure(f.cause());
217 }
218 });
219
220 super.doRegister(registerPromise);
221 }
222
223 @Override
224 protected Object filterOutboundMessage(Object msg) {
225
226
227 if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
228 return new IoUringFileRegion((DefaultFileRegion) msg);
229 }
230
231 return super.filterOutboundMessage(msg);
232 }
233
234 protected class IoUringStreamUnsafe extends AbstractUringUnsafe {
235
236 private ByteBuf readBuffer;
237
238 @Override
239 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
240 assert writeId == 0;
241
242 int fd = fd().intValue();
243 IoRegistration registration = registration();
244 IoUringIoHandler handler = registration.attachment();
245 IovArray iovArray = handler.iovArray();
246 int offset = iovArray.count();
247
248 try {
249 in.forEachFlushedMessage(iovArray);
250 } catch (Exception e) {
251
252 return scheduleWriteSingle(in.current());
253 }
254 long iovArrayAddress = iovArray.memoryAddress(offset);
255 int iovArrayLength = iovArray.count() - offset;
256
257 IoUringIoOps ops = IoUringIoOps.newWritev(fd, (byte) 0, 0, iovArrayAddress, iovArrayLength, nextOpsId());
258
259 byte opCode = ops.opcode();
260 writeId = registration.submit(ops);
261 writeOpCode = opCode;
262 if (writeId == 0) {
263 return 0;
264 }
265 return 1;
266 }
267
268 @Override
269 protected int scheduleWriteSingle(Object msg) {
270 assert writeId == 0;
271
272 int fd = fd().intValue();
273 IoRegistration registration = registration();
274 final IoUringIoOps ops;
275 if (msg instanceof IoUringFileRegion) {
276 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
277 try {
278 fileRegion.open();
279 } catch (IOException e) {
280 this.handleWriteError(e);
281 return 0;
282 }
283 ops = fileRegion.splice(fd);
284 } else {
285 ByteBuf buf = (ByteBuf) msg;
286 long address = IoUring.memoryAddress(buf) + buf.readerIndex();
287 int length = buf.readableBytes();
288 short opsid = nextOpsId();
289
290 ops = IoUringIoOps.newWrite(fd, (byte) 0, 0, address, length, opsid);
291 }
292 byte opCode = ops.opcode();
293 writeId = registration.submit(ops);
294 writeOpCode = opCode;
295 if (writeId == 0) {
296 return 0;
297 }
298 return 1;
299 }
300
301 private int calculateRecvFlags(boolean first) {
302
303
304
305
306
307
308 if (first) {
309 return 0;
310 }
311 return Native.MSG_DONTWAIT;
312 }
313
314 private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
315
316
317
318 if (first) {
319
320
321 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
322 Native.IORING_RECVSEND_POLL_FIRST : 0;
323 }
324 return 0;
325 }
326
327 @Override
328 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
329 assert readBuffer == null;
330 assert readId == 0 : readId;
331 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
332
333 if (bufferRing != null && bufferRing.isUsable()) {
334 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
335 }
336
337
338 ByteBuf byteBuf = allocHandle.allocate(alloc());
339 try {
340 int fd = fd().intValue();
341 IoRegistration registration = registration();
342 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
343 int recvFlags = calculateRecvFlags(first);
344
345 IoUringIoOps ops = IoUringIoOps.newRecv(fd, (byte) 0, ioPrio, recvFlags,
346 IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
347 readId = registration.submit(ops);
348 readOpCode = Native.IORING_OP_RECV;
349 if (readId == 0) {
350 return 0;
351 }
352 readBuffer = byteBuf;
353 byteBuf = null;
354 return 1;
355 } finally {
356 if (byteBuf != null) {
357 byteBuf.release();
358 }
359 }
360 }
361
362 private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
363 short bgId = bufferRing.bufferGroupId();
364 try {
365 boolean multishot = IoUring.isRecvMultishotEnabled();
366 byte flags = (byte) Native.IOSQE_BUFFER_SELECT;
367 short ioPrio;
368 final int recvFlags;
369 if (multishot) {
370 ioPrio = Native.IORING_RECV_MULTISHOT;
371 recvFlags = 0;
372 } else {
373
374
375 ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
376 recvFlags = calculateRecvFlags(first);
377 }
378 if (IoUring.isRecvsendBundleEnabled()) {
379
380
381 ioPrio |= Native.IORING_RECVSEND_BUNDLE;
382 }
383 IoRegistration registration = registration();
384 int fd = fd().intValue();
385 IoUringIoOps ops = IoUringIoOps.newRecv(
386 fd, flags, ioPrio, recvFlags, 0,
387 0, nextOpsId(), bgId
388 );
389 readId = registration.submit(ops);
390 readOpCode = Native.IORING_OP_RECV;
391 if (readId == 0) {
392 return 0;
393 }
394 if (multishot) {
395
396 return -1;
397 }
398 return 1;
399 } catch (IllegalArgumentException illegalArgumentException) {
400 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
401 return 0;
402 }
403 }
404
405 @Override
406 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
407 ByteBuf byteBuf = readBuffer;
408 readBuffer = null;
409 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
410 readId = 0;
411
412
413 if (byteBuf != null) {
414
415 byteBuf.release();
416 }
417 return;
418 }
419 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
420 boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
421 short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
422 boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
423
424 boolean empty = socketIsEmpty(flags);
425 if (rearm) {
426
427 readId = 0;
428 }
429
430 boolean allDataRead = false;
431
432 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
433 final ChannelPipeline pipeline = pipeline();
434
435 try {
436 if (res < 0) {
437 if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
438
439 if (!bufferRing.expand()) {
440
441
442
443 pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
444 }
445
446
447
448
449 scheduleRead(allocHandle.isFirstRead());
450 return;
451 }
452
453
454
455 allocHandle.lastBytesRead(ioResult("io_uring read", res));
456 } else if (res > 0) {
457 if (useBufferRing) {
458
459
460
461
462
463 int read = res;
464 for (;;) {
465 int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
466 byteBuf = bufferRing.useBuffer(bid, read, more);
467 read -= byteBuf.readableBytes();
468 allocHandle.attemptedBytesRead(attemptedBytesRead);
469 allocHandle.lastBytesRead(byteBuf.readableBytes());
470
471 assert read >= 0;
472 if (read == 0) {
473
474
475 break;
476 }
477 allocHandle.incMessagesRead(1);
478 pipeline.fireChannelRead(byteBuf);
479 byteBuf = null;
480 bid = bufferRing.nextBid(bid);
481 if (!allocHandle.continueReading()) {
482
483 allocHandle.readComplete();
484 pipeline.fireChannelReadComplete();
485 allocHandle.reset(config());
486 }
487 }
488 } else {
489 int attemptedBytesRead = byteBuf.writableBytes();
490 byteBuf.writerIndex(byteBuf.writerIndex() + res);
491 allocHandle.attemptedBytesRead(attemptedBytesRead);
492 allocHandle.lastBytesRead(res);
493 }
494 } else {
495
496 allocHandle.lastBytesRead(-1);
497 }
498 if (allocHandle.lastBytesRead() <= 0) {
499
500 if (byteBuf != null) {
501
502 byteBuf.release();
503 byteBuf = null;
504 }
505 allDataRead = allocHandle.lastBytesRead() < 0;
506 if (allDataRead) {
507
508 shutdownInput(true);
509 }
510 allocHandle.readComplete();
511 pipeline.fireChannelReadComplete();
512 return;
513 }
514
515 allocHandle.incMessagesRead(1);
516 pipeline.fireChannelRead(byteBuf);
517 byteBuf = null;
518 scheduleNextRead(pipeline, allocHandle, rearm, empty);
519 } catch (Throwable t) {
520 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
521 }
522 }
523
524 private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
525 boolean rearm, boolean empty) {
526 if (allocHandle.continueReading() && !empty) {
527 if (rearm) {
528
529
530 scheduleRead(false);
531 }
532 } else {
533
534 allocHandle.readComplete();
535 pipeline.fireChannelReadComplete();
536 }
537 }
538
539 protected final void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
540 Throwable cause, boolean allDataRead,
541 IoUringRecvByteAllocatorHandle allocHandle) {
542 if (byteBuf != null) {
543 if (byteBuf.isReadable()) {
544 pipeline.fireChannelRead(byteBuf);
545 } else {
546 byteBuf.release();
547 }
548 }
549 allocHandle.readComplete();
550 pipeline.fireChannelReadComplete();
551 pipeline.fireExceptionCaught(cause);
552 if (allDataRead || cause instanceof IOException) {
553 shutdownInput(true);
554 }
555 }
556
557 private boolean handleWriteCompleteFileRegion(ChannelOutboundBuffer channelOutboundBuffer,
558 IoUringFileRegion fileRegion, int res, short data) {
559 try {
560 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
561 return true;
562 }
563 int result = res >= 0 ? res : ioResult("io_uring splice", res);
564 if (result == 0 && fileRegion.count() > 0) {
565 validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
566 return false;
567 }
568 int progress = fileRegion.handleResult(result, data);
569 if (progress == -1) {
570
571 channelOutboundBuffer.remove();
572 } else if (progress > 0) {
573 channelOutboundBuffer.progress(progress);
574 }
575 } catch (Throwable cause) {
576 handleWriteError(cause);
577 }
578 return true;
579 }
580
581 @Override
582 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
583 if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
584
585
586
587
588 writeId = 0;
589 writeOpCode = 0;
590 }
591 ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
592 Object current = channelOutboundBuffer.current();
593 if (current instanceof IoUringFileRegion) {
594 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
595 return handleWriteCompleteFileRegion(channelOutboundBuffer, fileRegion, res, data);
596 }
597
598 if (res >= 0) {
599 channelOutboundBuffer.removeBytes(res);
600 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
601 return true;
602 } else {
603 try {
604 if (ioResult("io_uring write", res) == 0) {
605 return false;
606 }
607 } catch (Throwable cause) {
608 handleWriteError(cause);
609 }
610 }
611 return true;
612 }
613
614 @Override
615 protected void freeResourcesNow(IoRegistration reg) {
616 super.freeResourcesNow(reg);
617 assert readBuffer == null;
618 }
619 }
620
621 @Override
622 protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
623 if (readId != 0) {
624
625 assert numOutstandingReads == 1 || numOutstandingReads == -1;
626 IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, readId, readOpCode);
627 long id = registration.submit(ops);
628 assert id != 0;
629 readId = 0;
630 }
631 }
632
633 @Override
634 protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
635 if (writeId != 0) {
636
637
638 assert numOutstandingWrites == 1;
639 assert writeOpCode != 0;
640 long id = registration.submit(IoUringIoOps.newAsyncCancel((byte) 0, writeId, writeOpCode));
641 assert id != 0;
642 writeId = 0;
643 }
644 }
645
646 @Override
647 protected boolean socketIsEmpty(int flags) {
648 return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
649 }
650
651 @Override
652 boolean isPollInFirst() {
653 return bufferRing == null || !bufferRing.isUsable();
654 }
655 }