1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.DefaultFileRegion;
27 import io.netty.channel.EventLoop;
28 import io.netty.channel.IoRegistration;
29 import io.netty.channel.socket.DuplexChannel;
30 import io.netty.channel.unix.IovArray;
31 import io.netty.channel.unix.Limits;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.io.IOException;
36 import java.net.SocketAddress;
37
38 import static io.netty.channel.unix.Errors.ioResult;
39
40 abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
41 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
42 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
43
44
45 private byte writeOpCode;
46
47 private long writeId;
48 private long readId;
49
50
51 private IoUringBufferRing bufferRing;
52
53 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
54 super(parent, socket, active);
55 }
56
57 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
58 super(parent, socket, remote);
59 }
60
61 @Override
62 public ChannelMetadata metadata() {
63 return METADATA;
64 }
65
66 @Override
67 protected final AbstractUringUnsafe newUnsafe() {
68 return new IoUringStreamUnsafe();
69 }
70
71 @Override
72 public final ChannelFuture shutdown() {
73 return shutdown(newPromise());
74 }
75
76 @Override
77 public final ChannelFuture shutdown(final ChannelPromise promise) {
78 ChannelFuture shutdownOutputFuture = shutdownOutput();
79 if (shutdownOutputFuture.isDone()) {
80 shutdownOutputDone(shutdownOutputFuture, promise);
81 } else {
82 shutdownOutputFuture.addListener(new ChannelFutureListener() {
83 @Override
84 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
85 shutdownOutputDone(shutdownOutputFuture, promise);
86 }
87 });
88 }
89 return promise;
90 }
91
92 @Override
93 protected final void doShutdownOutput() throws Exception {
94 socket.shutdown(false, true);
95 }
96
97 private void shutdownInput0(final ChannelPromise promise) {
98 try {
99 socket.shutdown(true, false);
100 promise.setSuccess();
101 } catch (Throwable cause) {
102 promise.setFailure(cause);
103 }
104 }
105
106 @Override
107 public final boolean isOutputShutdown() {
108 return socket.isOutputShutdown();
109 }
110
111 @Override
112 public final boolean isInputShutdown() {
113 return socket.isInputShutdown();
114 }
115
116 @Override
117 public final boolean isShutdown() {
118 return socket.isShutdown();
119 }
120
121 @Override
122 public final ChannelFuture shutdownOutput() {
123 return shutdownOutput(newPromise());
124 }
125
126 @Override
127 public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
128 EventLoop loop = eventLoop();
129 if (loop.inEventLoop()) {
130 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
131 } else {
132 loop.execute(new Runnable() {
133 @Override
134 public void run() {
135 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
136 }
137 });
138 }
139
140 return promise;
141 }
142
143 @Override
144 public final ChannelFuture shutdownInput() {
145 return shutdownInput(newPromise());
146 }
147
148 @Override
149 public final ChannelFuture shutdownInput(final ChannelPromise promise) {
150 EventLoop loop = eventLoop();
151 if (loop.inEventLoop()) {
152 shutdownInput0(promise);
153 } else {
154 loop.execute(new Runnable() {
155 @Override
156 public void run() {
157 shutdownInput0(promise);
158 }
159 });
160 }
161 return promise;
162 }
163
164 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
165 ChannelFuture shutdownInputFuture = shutdownInput();
166 if (shutdownInputFuture.isDone()) {
167 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
168 } else {
169 shutdownInputFuture.addListener(new ChannelFutureListener() {
170 @Override
171 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
172 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
173 }
174 });
175 }
176 }
177
178 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
179 ChannelFuture shutdownInputFuture,
180 ChannelPromise promise) {
181 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
182 Throwable shutdownInputCause = shutdownInputFuture.cause();
183 if (shutdownOutputCause != null) {
184 if (shutdownInputCause != null) {
185 logger.info("Exception suppressed because a previous exception occurred.",
186 shutdownInputCause);
187 }
188 promise.setFailure(shutdownOutputCause);
189 } else if (shutdownInputCause != null) {
190 promise.setFailure(shutdownInputCause);
191 } else {
192 promise.setSuccess();
193 }
194 }
195
196 @Override
197 protected final void doRegister(ChannelPromise promise) {
198 super.doRegister(promise);
199 promise.addListener(f -> {
200 if (f.isSuccess()) {
201 short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
202 if (bgid >= 0) {
203 final IoUringIoHandler ioUringIoHandler = registration().attachment();
204 bufferRing = ioUringIoHandler.findBufferRing(bgid);
205 }
206 if (active) {
207
208 schedulePollRdHup();
209 }
210 }
211 });
212 }
213
214 @Override
215 protected Object filterOutboundMessage(Object msg) {
216
217
218 if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
219 return new IoUringFileRegion((DefaultFileRegion) msg);
220 }
221
222 return super.filterOutboundMessage(msg);
223 }
224
225 private final class IoUringStreamUnsafe extends AbstractUringUnsafe {
226
227 private ByteBuf readBuffer;
228 private IovArray iovArray;
229
230 @Override
231 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
232 assert iovArray == null;
233 assert writeId == 0;
234 int numElements = Math.min(in.size(), Limits.IOV_MAX);
235 ByteBuf iovArrayBuffer = alloc().directBuffer(numElements * IovArray.IOV_SIZE);
236 iovArray = new IovArray(iovArrayBuffer);
237 try {
238 int offset = iovArray.count();
239 in.forEachFlushedMessage(iovArray);
240
241 int fd = fd().intValue();
242 IoRegistration registration = registration();
243 IoUringIoOps ops = IoUringIoOps.newWritev(fd, flags((byte) 0), 0, iovArray.memoryAddress(offset),
244 iovArray.count() - offset, nextOpsId());
245 byte opCode = ops.opcode();
246 writeId = registration.submit(ops);
247 writeOpCode = opCode;
248 if (writeId == 0) {
249 iovArray.release();
250 iovArray = null;
251 return 0;
252 }
253 } catch (Exception e) {
254 iovArray.release();
255 iovArray = null;
256
257
258 scheduleWriteSingle(in.current());
259 }
260 return 1;
261 }
262
263 @Override
264 protected int scheduleWriteSingle(Object msg) {
265 assert iovArray == null;
266 assert writeId == 0;
267
268 int fd = fd().intValue();
269 IoRegistration registration = registration();
270 final IoUringIoOps ops;
271 if (msg instanceof IoUringFileRegion) {
272 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
273 try {
274 fileRegion.open();
275 } catch (IOException e) {
276 this.handleWriteError(e);
277 return 0;
278 }
279 ops = fileRegion.splice(fd);
280 } else {
281 ByteBuf buf = (ByteBuf) msg;
282 ops = IoUringIoOps.newWrite(fd, flags((byte) 0), 0,
283 buf.memoryAddress() + buf.readerIndex(), buf.readableBytes(), nextOpsId());
284 }
285
286 byte opCode = ops.opcode();
287 writeId = registration.submit(ops);
288 writeOpCode = opCode;
289 if (writeId == 0) {
290 return 0;
291 }
292 return 1;
293 }
294
295 private int calculateRecvFlags(boolean first) {
296
297
298
299
300
301
302 if (first) {
303 return 0;
304 }
305 return Native.MSG_DONTWAIT;
306 }
307
308 private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
309
310
311
312 if (first) {
313
314
315 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
316 Native.IORING_RECVSEND_POLL_FIRST : 0;
317 }
318 return 0;
319 }
320
321 @Override
322 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
323 assert readBuffer == null;
324 assert readId == 0 : readId;
325 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
326
327 if (bufferRing != null && bufferRing.isUsable()) {
328 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
329 }
330
331
332 ByteBuf byteBuf = allocHandle.allocate(alloc());
333 try {
334 int fd = fd().intValue();
335 IoRegistration registration = registration();
336 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
337 int recvFlags = calculateRecvFlags(first);
338
339 IoUringIoOps ops = IoUringIoOps.newRecv(fd, flags((byte) 0), ioPrio, recvFlags,
340 byteBuf.memoryAddress() + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
341 readId = registration.submit(ops);
342 if (readId == 0) {
343 return 0;
344 }
345 readBuffer = byteBuf;
346 byteBuf = null;
347 return 1;
348 } finally {
349 if (byteBuf != null) {
350 byteBuf.release();
351 }
352 }
353 }
354
355 private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
356 short bgId = bufferRing.bufferGroupId();
357 try {
358 boolean multishot = IoUring.isRecvMultishotSupported();
359 byte flags = flags((byte) Native.IOSQE_BUFFER_SELECT);
360 short ioPrio;
361 final int recvFlags;
362 if (multishot) {
363 ioPrio = Native.IORING_RECV_MULTISHOT;
364 recvFlags = 0;
365 } else {
366
367
368 ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
369 recvFlags = calculateRecvFlags(first);
370 }
371 if (IoUring.isRecvsendBundleSupported()) {
372
373
374 ioPrio |= Native.IORING_RECVSEND_BUNDLE;
375 }
376 IoRegistration registration = registration();
377 int fd = fd().intValue();
378 IoUringIoOps ops = IoUringIoOps.newRecv(
379 fd, flags, ioPrio, recvFlags, 0,
380 0, nextOpsId(), bgId
381 );
382 readId = registration.submit(ops);
383 if (readId == 0) {
384 return 0;
385 }
386 if (multishot) {
387
388 return -1;
389 }
390 return 1;
391 } catch (IllegalArgumentException illegalArgumentException) {
392 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
393 return 0;
394 }
395 }
396
397 @Override
398 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
399 ByteBuf byteBuf = readBuffer;
400 readBuffer = null;
401 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
402 readId = 0;
403
404
405 if (byteBuf != null) {
406
407 byteBuf.release();
408 }
409 return;
410 }
411 assert readId != 0;
412 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
413 boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
414 short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
415 boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
416
417 boolean empty = socketIsEmpty(flags);
418 if (rearm) {
419
420 readId = 0;
421 }
422
423 boolean allDataRead = false;
424
425 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
426 final ChannelPipeline pipeline = pipeline();
427
428 try {
429 if (res < 0) {
430 if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
431
432
433
434 pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
435
436
437
438
439 scheduleRead(allocHandle.isFirstRead());
440 return;
441 }
442
443
444
445 allocHandle.lastBytesRead(ioResult("io_uring read", res));
446 } else if (res > 0) {
447 if (useBufferRing) {
448
449 if (IoUring.isRecvsendBundleSupported()) {
450
451
452
453
454
455 int read = res;
456 for (;;) {
457 int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
458 byteBuf = bufferRing.useBuffer(bid, read, more);
459 read -= byteBuf.readableBytes();
460 allocHandle.attemptedBytesRead(attemptedBytesRead);
461 allocHandle.lastBytesRead(byteBuf.readableBytes());
462
463 assert read >= 0;
464 if (read == 0) {
465
466
467 break;
468 }
469 allocHandle.incMessagesRead(1);
470 pipeline.fireChannelRead(byteBuf);
471 byteBuf = null;
472
473
474
475
476 bufferRing.fillBuffer(bid);
477 bid = bufferRing.nextBid(bid);
478 if (!allocHandle.continueReading()) {
479
480 allocHandle.readComplete();
481 pipeline.fireChannelReadComplete();
482 allocHandle.reset(config());
483 }
484 }
485 } else {
486 int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
487 byteBuf = bufferRing.useBuffer(bid, res, more);
488 allocHandle.attemptedBytesRead(attemptedBytesRead);
489 allocHandle.lastBytesRead(res);
490 }
491 } else {
492 int attemptedBytesRead = byteBuf.writableBytes();
493 byteBuf.writerIndex(byteBuf.writerIndex() + res);
494 allocHandle.attemptedBytesRead(attemptedBytesRead);
495 allocHandle.lastBytesRead(res);
496 }
497 } else {
498
499 allocHandle.lastBytesRead(-1);
500 }
501 if (allocHandle.lastBytesRead() <= 0) {
502
503 if (byteBuf != null) {
504
505 byteBuf.release();
506 byteBuf = null;
507 }
508 allDataRead = allocHandle.lastBytesRead() < 0;
509 if (allDataRead) {
510
511 shutdownInput(true);
512 }
513 allocHandle.readComplete();
514 pipeline.fireChannelReadComplete();
515 return;
516 }
517
518 allocHandle.incMessagesRead(1);
519 pipeline.fireChannelRead(byteBuf);
520 byteBuf = null;
521 if (useBufferRing && !more) {
522
523
524
525 bufferRing.fillBuffer(bid);
526 }
527 scheduleNextRead(pipeline, allocHandle, rearm, empty);
528 } catch (Throwable t) {
529 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
530 }
531 }
532
533 private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
534 boolean rearm, boolean empty) {
535 if (allocHandle.continueReading() && !empty) {
536 if (rearm) {
537
538
539 scheduleRead(false);
540 }
541 } else {
542
543 allocHandle.readComplete();
544 pipeline.fireChannelReadComplete();
545 }
546 }
547
548 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
549 Throwable cause, boolean allDataRead,
550 IoUringRecvByteAllocatorHandle allocHandle) {
551 if (byteBuf != null) {
552 if (byteBuf.isReadable()) {
553 pipeline.fireChannelRead(byteBuf);
554 } else {
555 byteBuf.release();
556 }
557 }
558 allocHandle.readComplete();
559 pipeline.fireChannelReadComplete();
560 pipeline.fireExceptionCaught(cause);
561 if (allDataRead || cause instanceof IOException) {
562 shutdownInput(true);
563 }
564 }
565
566 @Override
567 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
568 assert writeId != 0;
569 writeId = 0;
570 writeOpCode = 0;
571
572 ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
573 Object current = channelOutboundBuffer.current();
574 if (current instanceof IoUringFileRegion) {
575 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
576 try {
577 int result = res >= 0 ? res : ioResult("io_uring splice", res);
578 if (result == 0 && fileRegion.count() > 0) {
579 validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
580 return false;
581 }
582 int progress = fileRegion.handleResult(result, data);
583 if (progress == -1) {
584
585 channelOutboundBuffer.remove();
586 } else if (progress > 0) {
587 channelOutboundBuffer.progress(progress);
588 }
589 } catch (Throwable cause) {
590 handleWriteError(cause);
591 }
592 return true;
593 }
594
595 IovArray iovArray = this.iovArray;
596 if (iovArray != null) {
597 this.iovArray = null;
598 iovArray.release();
599 }
600 if (res >= 0) {
601 channelOutboundBuffer.removeBytes(res);
602 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
603 return true;
604 } else {
605 try {
606 if (ioResult("io_uring write", res) == 0) {
607 return false;
608 }
609 } catch (Throwable cause) {
610 handleWriteError(cause);
611 }
612 }
613 return true;
614 }
615
616 @Override
617 protected void freeResourcesNow(IoRegistration reg) {
618 super.freeResourcesNow(reg);
619 assert readBuffer == null;
620 }
621 }
622
623 @Override
624 protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
625 if (readId != 0) {
626
627 assert numOutstandingReads == 1 || numOutstandingReads == -1;
628 IoUringIoOps ops = IoUringIoOps.newAsyncCancel(flags((byte) 0), readId, Native.IORING_OP_RECV);
629 long id = registration.submit(ops);
630 assert id != 0;
631 } else {
632 assert numOutstandingReads == 0 || numOutstandingReads == -1;
633 }
634 }
635
636 @Override
637 protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
638 if (writeId != 0) {
639
640
641 assert numOutstandingWrites == 1;
642 assert writeOpCode != 0;
643 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags((byte) 0), writeId, writeOpCode));
644 assert id != 0;
645 } else {
646 assert numOutstandingWrites == 0;
647 }
648 }
649
650 @Override
651 protected boolean socketIsEmpty(int flags) {
652 return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
653 }
654
655 @Override
656 protected void submitAndRunNow() {
657 if (writeId != 0) {
658
659
660 ((IoUringIoHandler) registration().attachment()).submitAndRunNow(writeId);
661 }
662 super.submitAndRunNow();
663 }
664
665 @Override
666 boolean isPollInFirst() {
667 return bufferRing == null || !bufferRing.isUsable();
668 }
669 }