1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.DefaultFileRegion;
27 import io.netty.channel.EventLoop;
28 import io.netty.channel.IoRegistration;
29 import io.netty.channel.socket.DuplexChannel;
30 import io.netty.channel.unix.IovArray;
31 import io.netty.channel.unix.Limits;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.io.IOException;
36 import java.net.SocketAddress;
37
38 import static io.netty.channel.unix.Errors.ioResult;
39
40 abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
41 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
42 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
43
44
45 private byte writeOpCode;
46
47 private long writeId;
48 private long readId;
49
50
51 private IoUringBufferRing bufferRing;
52
53 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
54 super(parent, socket, active);
55 }
56
57 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
58 super(parent, socket, remote);
59 }
60
61 @Override
62 public ChannelMetadata metadata() {
63 return METADATA;
64 }
65
66 @Override
67 protected final AbstractUringUnsafe newUnsafe() {
68 return new IoUringStreamUnsafe();
69 }
70
71 @Override
72 public final ChannelFuture shutdown() {
73 return shutdown(newPromise());
74 }
75
76 @Override
77 public final ChannelFuture shutdown(final ChannelPromise promise) {
78 ChannelFuture shutdownOutputFuture = shutdownOutput();
79 if (shutdownOutputFuture.isDone()) {
80 shutdownOutputDone(shutdownOutputFuture, promise);
81 } else {
82 shutdownOutputFuture.addListener(new ChannelFutureListener() {
83 @Override
84 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
85 shutdownOutputDone(shutdownOutputFuture, promise);
86 }
87 });
88 }
89 return promise;
90 }
91
92 @Override
93 protected final void doShutdownOutput() throws Exception {
94 socket.shutdown(false, true);
95 }
96
97 private void shutdownInput0(final ChannelPromise promise) {
98 try {
99 socket.shutdown(true, false);
100 promise.setSuccess();
101 } catch (Throwable cause) {
102 promise.setFailure(cause);
103 }
104 }
105
106 @Override
107 public final boolean isOutputShutdown() {
108 return socket.isOutputShutdown();
109 }
110
111 @Override
112 public final boolean isInputShutdown() {
113 return socket.isInputShutdown();
114 }
115
116 @Override
117 public final boolean isShutdown() {
118 return socket.isShutdown();
119 }
120
121 @Override
122 public final ChannelFuture shutdownOutput() {
123 return shutdownOutput(newPromise());
124 }
125
126 @Override
127 public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
128 EventLoop loop = eventLoop();
129 if (loop.inEventLoop()) {
130 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
131 } else {
132 loop.execute(new Runnable() {
133 @Override
134 public void run() {
135 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
136 }
137 });
138 }
139
140 return promise;
141 }
142
143 @Override
144 public final ChannelFuture shutdownInput() {
145 return shutdownInput(newPromise());
146 }
147
148 @Override
149 public final ChannelFuture shutdownInput(final ChannelPromise promise) {
150 EventLoop loop = eventLoop();
151 if (loop.inEventLoop()) {
152 shutdownInput0(promise);
153 } else {
154 loop.execute(new Runnable() {
155 @Override
156 public void run() {
157 shutdownInput0(promise);
158 }
159 });
160 }
161 return promise;
162 }
163
164 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
165 ChannelFuture shutdownInputFuture = shutdownInput();
166 if (shutdownInputFuture.isDone()) {
167 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
168 } else {
169 shutdownInputFuture.addListener(new ChannelFutureListener() {
170 @Override
171 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
172 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
173 }
174 });
175 }
176 }
177
178 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
179 ChannelFuture shutdownInputFuture,
180 ChannelPromise promise) {
181 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
182 Throwable shutdownInputCause = shutdownInputFuture.cause();
183 if (shutdownOutputCause != null) {
184 if (shutdownInputCause != null) {
185 logger.info("Exception suppressed because a previous exception occurred.",
186 shutdownInputCause);
187 }
188 promise.setFailure(shutdownOutputCause);
189 } else if (shutdownInputCause != null) {
190 promise.setFailure(shutdownInputCause);
191 } else {
192 promise.setSuccess();
193 }
194 }
195
196 @Override
197 protected final void doRegister(ChannelPromise promise) {
198 ChannelPromise registerPromise = this.newPromise();
199
200 registerPromise.addListener(f -> {
201 if (f.isSuccess()) {
202 try {
203 short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
204 if (bgid >= 0) {
205 final IoUringIoHandler ioUringIoHandler = registration().attachment();
206 bufferRing = ioUringIoHandler.findBufferRing(bgid);
207 }
208 if (active) {
209
210 schedulePollRdHup();
211 }
212 } finally {
213 promise.setSuccess();
214 }
215 } else {
216 promise.setFailure(f.cause());
217 }
218 });
219
220 super.doRegister(registerPromise);
221 }
222
223 @Override
224 protected Object filterOutboundMessage(Object msg) {
225
226
227 if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
228 return new IoUringFileRegion((DefaultFileRegion) msg);
229 }
230
231 return super.filterOutboundMessage(msg);
232 }
233
234 private final class IoUringStreamUnsafe extends AbstractUringUnsafe {
235
236 private ByteBuf readBuffer;
237
238 @Override
239 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
240 assert writeId == 0;
241 int numElements = Math.min(in.size(), Limits.IOV_MAX);
242 IoUringIoHandler handler = registration().attachment();
243 IovArray iovArray = handler.iovArray();
244 try {
245 int offset = iovArray.count();
246 in.forEachFlushedMessage(iovArray);
247
248 int fd = fd().intValue();
249 IoRegistration registration = registration();
250 IoUringIoOps ops = IoUringIoOps.newWritev(fd, (byte) 0, 0, iovArray.memoryAddress(offset),
251 iovArray.count() - offset, nextOpsId());
252 byte opCode = ops.opcode();
253 writeId = registration.submit(ops);
254 writeOpCode = opCode;
255 if (writeId == 0) {
256 return 0;
257 }
258 } catch (Exception e) {
259
260 scheduleWriteSingle(in.current());
261 }
262 return 1;
263 }
264
265 @Override
266 protected int scheduleWriteSingle(Object msg) {
267 assert writeId == 0;
268
269 int fd = fd().intValue();
270 IoRegistration registration = registration();
271 final IoUringIoOps ops;
272 if (msg instanceof IoUringFileRegion) {
273 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
274 try {
275 fileRegion.open();
276 } catch (IOException e) {
277 this.handleWriteError(e);
278 return 0;
279 }
280 ops = fileRegion.splice(fd);
281 } else {
282 ByteBuf buf = (ByteBuf) msg;
283 ops = IoUringIoOps.newWrite(fd, (byte) 0, 0,
284 IoUring.memoryAddress(buf) + buf.readerIndex(), buf.readableBytes(), nextOpsId());
285 }
286
287 byte opCode = ops.opcode();
288 writeId = registration.submit(ops);
289 writeOpCode = opCode;
290 if (writeId == 0) {
291 return 0;
292 }
293 return 1;
294 }
295
296 private int calculateRecvFlags(boolean first) {
297
298
299
300
301
302
303 if (first) {
304 return 0;
305 }
306 return Native.MSG_DONTWAIT;
307 }
308
309 private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
310
311
312
313 if (first) {
314
315
316 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
317 Native.IORING_RECVSEND_POLL_FIRST : 0;
318 }
319 return 0;
320 }
321
322 @Override
323 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
324 assert readBuffer == null;
325 assert readId == 0 : readId;
326 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
327
328 if (bufferRing != null && bufferRing.isUsable()) {
329 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
330 }
331
332
333 ByteBuf byteBuf = allocHandle.allocate(alloc());
334 try {
335 int fd = fd().intValue();
336 IoRegistration registration = registration();
337 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
338 int recvFlags = calculateRecvFlags(first);
339
340 IoUringIoOps ops = IoUringIoOps.newRecv(fd, (byte) 0, ioPrio, recvFlags,
341 IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
342 readId = registration.submit(ops);
343 if (readId == 0) {
344 return 0;
345 }
346 readBuffer = byteBuf;
347 byteBuf = null;
348 return 1;
349 } finally {
350 if (byteBuf != null) {
351 byteBuf.release();
352 }
353 }
354 }
355
356 private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
357 short bgId = bufferRing.bufferGroupId();
358 try {
359 boolean multishot = IoUring.isRecvMultishotEnabled();
360 byte flags = (byte) Native.IOSQE_BUFFER_SELECT;
361 short ioPrio;
362 final int recvFlags;
363 if (multishot) {
364 ioPrio = Native.IORING_RECV_MULTISHOT;
365 recvFlags = 0;
366 } else {
367
368
369 ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
370 recvFlags = calculateRecvFlags(first);
371 }
372 if (IoUring.isRecvsendBundleEnabled()) {
373
374
375 ioPrio |= Native.IORING_RECVSEND_BUNDLE;
376 }
377 IoRegistration registration = registration();
378 int fd = fd().intValue();
379 IoUringIoOps ops = IoUringIoOps.newRecv(
380 fd, flags, ioPrio, recvFlags, 0,
381 0, nextOpsId(), bgId
382 );
383 readId = registration.submit(ops);
384 if (readId == 0) {
385 return 0;
386 }
387 if (multishot) {
388
389 return -1;
390 }
391 return 1;
392 } catch (IllegalArgumentException illegalArgumentException) {
393 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
394 return 0;
395 }
396 }
397
398 @Override
399 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
400 ByteBuf byteBuf = readBuffer;
401 readBuffer = null;
402 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
403 readId = 0;
404
405
406 if (byteBuf != null) {
407
408 byteBuf.release();
409 }
410 return;
411 }
412 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
413 boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
414 short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
415 boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
416
417 boolean empty = socketIsEmpty(flags);
418 if (rearm) {
419
420 readId = 0;
421 }
422
423 boolean allDataRead = false;
424
425 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
426 final ChannelPipeline pipeline = pipeline();
427
428 try {
429 if (res < 0) {
430 if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
431
432
433
434 pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
435
436
437 bufferRing.expand();
438
439
440
441
442 scheduleRead(allocHandle.isFirstRead());
443 return;
444 }
445
446
447
448 allocHandle.lastBytesRead(ioResult("io_uring read", res));
449 } else if (res > 0) {
450 if (useBufferRing) {
451
452 if (IoUring.isRecvsendBundleEnabled()) {
453
454
455
456
457
458 int read = res;
459 for (;;) {
460 int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
461 byteBuf = bufferRing.useBuffer(bid, read, more);
462 read -= byteBuf.readableBytes();
463 allocHandle.attemptedBytesRead(attemptedBytesRead);
464 allocHandle.lastBytesRead(byteBuf.readableBytes());
465
466 assert read >= 0;
467 if (read == 0) {
468
469
470 break;
471 }
472 allocHandle.incMessagesRead(1);
473 pipeline.fireChannelRead(byteBuf);
474 byteBuf = null;
475
476
477
478
479 bufferRing.fillBuffer();
480 bid = bufferRing.nextBid(bid);
481 if (!allocHandle.continueReading()) {
482
483 allocHandle.readComplete();
484 pipeline.fireChannelReadComplete();
485 allocHandle.reset(config());
486 }
487 }
488 } else {
489 int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
490 byteBuf = bufferRing.useBuffer(bid, res, more);
491 allocHandle.attemptedBytesRead(attemptedBytesRead);
492 allocHandle.lastBytesRead(res);
493 }
494 } else {
495 int attemptedBytesRead = byteBuf.writableBytes();
496 byteBuf.writerIndex(byteBuf.writerIndex() + res);
497 allocHandle.attemptedBytesRead(attemptedBytesRead);
498 allocHandle.lastBytesRead(res);
499 }
500 } else {
501
502 allocHandle.lastBytesRead(-1);
503 }
504 if (allocHandle.lastBytesRead() <= 0) {
505
506 if (byteBuf != null) {
507
508 byteBuf.release();
509 byteBuf = null;
510 }
511 allDataRead = allocHandle.lastBytesRead() < 0;
512 if (allDataRead) {
513
514 shutdownInput(true);
515 }
516 allocHandle.readComplete();
517 pipeline.fireChannelReadComplete();
518 return;
519 }
520
521 allocHandle.incMessagesRead(1);
522 pipeline.fireChannelRead(byteBuf);
523 byteBuf = null;
524 if (useBufferRing && !more) {
525
526
527
528 bufferRing.fillBuffer();
529 }
530 scheduleNextRead(pipeline, allocHandle, rearm, empty);
531 } catch (Throwable t) {
532 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
533 }
534 }
535
536 private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
537 boolean rearm, boolean empty) {
538 if (allocHandle.continueReading() && !empty) {
539 if (rearm) {
540
541
542 scheduleRead(false);
543 }
544 } else {
545
546 allocHandle.readComplete();
547 pipeline.fireChannelReadComplete();
548 }
549 }
550
551 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
552 Throwable cause, boolean allDataRead,
553 IoUringRecvByteAllocatorHandle allocHandle) {
554 if (byteBuf != null) {
555 if (byteBuf.isReadable()) {
556 pipeline.fireChannelRead(byteBuf);
557 } else {
558 byteBuf.release();
559 }
560 }
561 allocHandle.readComplete();
562 pipeline.fireChannelReadComplete();
563 pipeline.fireExceptionCaught(cause);
564 if (allDataRead || cause instanceof IOException) {
565 shutdownInput(true);
566 }
567 }
568
569 @Override
570 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
571 writeId = 0;
572 writeOpCode = 0;
573
574 ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
575 Object current = channelOutboundBuffer.current();
576 if (current instanceof IoUringFileRegion) {
577 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
578 try {
579 int result = res >= 0 ? res : ioResult("io_uring splice", res);
580 if (result == 0 && fileRegion.count() > 0) {
581 validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
582 return false;
583 }
584 int progress = fileRegion.handleResult(result, data);
585 if (progress == -1) {
586
587 channelOutboundBuffer.remove();
588 } else if (progress > 0) {
589 channelOutboundBuffer.progress(progress);
590 }
591 } catch (Throwable cause) {
592 handleWriteError(cause);
593 }
594 return true;
595 }
596
597 if (res >= 0) {
598 channelOutboundBuffer.removeBytes(res);
599 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
600 return true;
601 } else {
602 try {
603 if (ioResult("io_uring write", res) == 0) {
604 return false;
605 }
606 } catch (Throwable cause) {
607 handleWriteError(cause);
608 }
609 }
610 return true;
611 }
612
613 @Override
614 protected void freeResourcesNow(IoRegistration reg) {
615 super.freeResourcesNow(reg);
616 assert readBuffer == null;
617 }
618 }
619
620 @Override
621 protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
622 if (readId != 0) {
623
624 assert numOutstandingReads == 1 || numOutstandingReads == -1;
625 IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, readId, Native.IORING_OP_RECV);
626 long id = registration.submit(ops);
627 assert id != 0;
628 readId = 0;
629 }
630 }
631
632 @Override
633 protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
634 if (writeId != 0) {
635
636
637 assert numOutstandingWrites == 1;
638 assert writeOpCode != 0;
639 long id = registration.submit(IoUringIoOps.newAsyncCancel((byte) 0, writeId, writeOpCode));
640 assert id != 0;
641 writeId = 0;
642 }
643 }
644
645 @Override
646 protected boolean socketIsEmpty(int flags) {
647 return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
648 }
649
650 @Override
651 protected void submitAndRunNow() {
652 if (writeId != 0) {
653
654
655 ((IoUringIoHandler) registration().attachment()).submitAndRunNow(writeId);
656 }
657 super.submitAndRunNow();
658 }
659
660 @Override
661 boolean isPollInFirst() {
662 return bufferRing == null || !bufferRing.isUsable();
663 }
664 }