1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.DefaultFileRegion;
27 import io.netty.channel.EventLoop;
28 import io.netty.channel.IoRegistration;
29 import io.netty.channel.socket.DuplexChannel;
30 import io.netty.channel.unix.IovArray;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.io.IOException;
35 import java.net.SocketAddress;
36
37 import static io.netty.channel.unix.Errors.ioResult;
38
39 abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
40 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
41 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
42
43
44 byte writeOpCode;
45
46 long writeId;
47 byte readOpCode;
48 long readId;
49
50
51 private IoUringBufferRing bufferRing;
52
53 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
54 super(parent, socket, active);
55 }
56
57 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
58 super(parent, socket, remote);
59 }
60
61 @Override
62 protected final boolean isStreamSocket() {
63 return true;
64 }
65
66 @Override
67 public ChannelMetadata metadata() {
68 return METADATA;
69 }
70
71 @Override
72 protected AbstractUringUnsafe newUnsafe() {
73 return new IoUringStreamUnsafe();
74 }
75
76 @Override
77 public final ChannelFuture shutdown() {
78 return shutdown(newPromise());
79 }
80
81 @Override
82 public final ChannelFuture shutdown(final ChannelPromise promise) {
83 ChannelFuture shutdownOutputFuture = shutdownOutput();
84 if (shutdownOutputFuture.isDone()) {
85 shutdownOutputDone(shutdownOutputFuture, promise);
86 } else {
87 shutdownOutputFuture.addListener(new ChannelFutureListener() {
88 @Override
89 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
90 shutdownOutputDone(shutdownOutputFuture, promise);
91 }
92 });
93 }
94 return promise;
95 }
96
97 @Override
98 protected final void doShutdownOutput() throws Exception {
99 socket.shutdown(false, true);
100 }
101
102 private void shutdownInput0(final ChannelPromise promise) {
103 try {
104 socket.shutdown(true, false);
105 promise.setSuccess();
106 } catch (Throwable cause) {
107 promise.setFailure(cause);
108 }
109 }
110
111 @Override
112 public final boolean isOutputShutdown() {
113 return socket.isOutputShutdown();
114 }
115
116 @Override
117 public final boolean isInputShutdown() {
118 return socket.isInputShutdown();
119 }
120
121 @Override
122 public final boolean isShutdown() {
123 return socket.isShutdown();
124 }
125
126 @Override
127 public final ChannelFuture shutdownOutput() {
128 return shutdownOutput(newPromise());
129 }
130
131 @Override
132 public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
133 EventLoop loop = eventLoop();
134 if (loop.inEventLoop()) {
135 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
136 } else {
137 loop.execute(new Runnable() {
138 @Override
139 public void run() {
140 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
141 }
142 });
143 }
144
145 return promise;
146 }
147
148 @Override
149 public final ChannelFuture shutdownInput() {
150 return shutdownInput(newPromise());
151 }
152
153 @Override
154 public final ChannelFuture shutdownInput(final ChannelPromise promise) {
155 EventLoop loop = eventLoop();
156 if (loop.inEventLoop()) {
157 shutdownInput0(promise);
158 } else {
159 loop.execute(new Runnable() {
160 @Override
161 public void run() {
162 shutdownInput0(promise);
163 }
164 });
165 }
166 return promise;
167 }
168
169 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
170 ChannelFuture shutdownInputFuture = shutdownInput();
171 if (shutdownInputFuture.isDone()) {
172 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
173 } else {
174 shutdownInputFuture.addListener(new ChannelFutureListener() {
175 @Override
176 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
177 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
178 }
179 });
180 }
181 }
182
183 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
184 ChannelFuture shutdownInputFuture,
185 ChannelPromise promise) {
186 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
187 Throwable shutdownInputCause = shutdownInputFuture.cause();
188 if (shutdownOutputCause != null) {
189 if (shutdownInputCause != null) {
190 logger.info("Exception suppressed because a previous exception occurred.",
191 shutdownInputCause);
192 }
193 promise.setFailure(shutdownOutputCause);
194 } else if (shutdownInputCause != null) {
195 promise.setFailure(shutdownInputCause);
196 } else {
197 promise.setSuccess();
198 }
199 }
200
201 @Override
202 protected final void doRegister(ChannelPromise promise) {
203 ChannelPromise registerPromise = this.newPromise();
204
205 registerPromise.addListener(f -> {
206 if (f.isSuccess()) {
207 try {
208 short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
209 if (bgid >= 0) {
210 final IoUringIoHandler ioUringIoHandler = registration().attachment();
211 bufferRing = ioUringIoHandler.findBufferRing(bgid);
212 }
213 if (active) {
214
215 schedulePollRdHup();
216 }
217 } finally {
218 promise.setSuccess();
219 }
220 } else {
221 promise.setFailure(f.cause());
222 }
223 });
224
225 super.doRegister(registerPromise);
226 }
227
228 @Override
229 protected Object filterOutboundMessage(Object msg) {
230
231
232 if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
233 return new IoUringFileRegion((DefaultFileRegion) msg);
234 }
235
236 return super.filterOutboundMessage(msg);
237 }
238
239 protected class IoUringStreamUnsafe extends AbstractUringUnsafe {
240
241 private ByteBuf readBuffer;
242
243 @Override
244 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
245 assert writeId == 0;
246
247 int fd = fd().intValue();
248 IoRegistration registration = registration();
249 IoUringIoHandler handler = registration.attachment();
250 IovArray iovArray = handler.iovArray();
251 int offset = iovArray.count();
252
253 try {
254 in.forEachFlushedMessage(filterWriteMultiple(iovArray));
255 } catch (Exception e) {
256
257 return scheduleWriteSingle(in.current());
258 }
259 long iovArrayAddress = iovArray.memoryAddress(offset);
260 int iovArrayLength = iovArray.count() - offset;
261
262 IoUringIoOps ops = IoUringIoOps.newWritev(fd, (byte) 0, 0, iovArrayAddress, iovArrayLength, nextOpsId());
263
264 byte opCode = ops.opcode();
265 writeId = registration.submit(ops);
266 writeOpCode = opCode;
267 if (writeId == 0) {
268 return 0;
269 }
270 return 1;
271 }
272
273 protected ChannelOutboundBuffer.MessageProcessor filterWriteMultiple(IovArray iovArray) {
274 return iovArray;
275 }
276
277 @Override
278 protected int scheduleWriteSingle(Object msg) {
279 assert writeId == 0;
280
281 int fd = fd().intValue();
282 IoRegistration registration = registration();
283 final IoUringIoOps ops;
284 if (msg instanceof IoUringFileRegion) {
285 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
286 try {
287 fileRegion.open();
288 } catch (IOException e) {
289 this.handleWriteError(e);
290 return 0;
291 }
292 ops = fileRegion.splice(fd);
293 } else {
294 ByteBuf buf = (ByteBuf) msg;
295 long address = IoUring.memoryAddress(buf) + buf.readerIndex();
296 int length = buf.readableBytes();
297 short opsid = nextOpsId();
298
299 ops = IoUringIoOps.newSend(fd, (byte) 0, 0, address, length, opsid);
300 }
301 byte opCode = ops.opcode();
302 writeId = registration.submit(ops);
303 writeOpCode = opCode;
304 if (writeId == 0) {
305 return 0;
306 }
307 return 1;
308 }
309
310 private int calculateRecvFlags(boolean first) {
311
312
313
314
315
316
317 if (first) {
318 return 0;
319 }
320 return Native.MSG_DONTWAIT;
321 }
322
323 private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
324
325
326
327 if (first) {
328
329
330 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
331 Native.IORING_RECVSEND_POLL_FIRST : 0;
332 }
333 return 0;
334 }
335
336 @Override
337 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
338 assert readBuffer == null;
339 assert readId == 0 : readId;
340 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
341
342 if (bufferRing != null && bufferRing.isUsable()) {
343 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
344 }
345
346
347 ByteBuf byteBuf = allocHandle.allocate(alloc());
348 try {
349 int fd = fd().intValue();
350 IoRegistration registration = registration();
351 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
352 int recvFlags = calculateRecvFlags(first);
353
354 IoUringIoOps ops = IoUringIoOps.newRecv(fd, (byte) 0, ioPrio, recvFlags,
355 IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
356 readId = registration.submit(ops);
357 readOpCode = Native.IORING_OP_RECV;
358 if (readId == 0) {
359 return 0;
360 }
361 readBuffer = byteBuf;
362 byteBuf = null;
363 return 1;
364 } finally {
365 if (byteBuf != null) {
366 byteBuf.release();
367 }
368 }
369 }
370
371 private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
372 short bgId = bufferRing.bufferGroupId();
373 try {
374 boolean multishot = IoUring.isRecvMultishotEnabled();
375 byte flags = (byte) Native.IOSQE_BUFFER_SELECT;
376 short ioPrio;
377 final int recvFlags;
378 if (multishot) {
379 ioPrio = Native.IORING_RECV_MULTISHOT;
380 recvFlags = 0;
381 } else {
382
383
384 ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
385 recvFlags = calculateRecvFlags(first);
386 }
387 if (IoUring.isRecvsendBundleEnabled()) {
388
389
390 ioPrio |= Native.IORING_RECVSEND_BUNDLE;
391 }
392 IoRegistration registration = registration();
393 int fd = fd().intValue();
394 IoUringIoOps ops = IoUringIoOps.newRecv(
395 fd, flags, ioPrio, recvFlags, 0,
396 0, nextOpsId(), bgId
397 );
398 readId = registration.submit(ops);
399 readOpCode = Native.IORING_OP_RECV;
400 if (readId == 0) {
401 return 0;
402 }
403 if (multishot) {
404
405 return -1;
406 }
407 return 1;
408 } catch (IllegalArgumentException illegalArgumentException) {
409 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
410 return 0;
411 }
412 }
413
414 @Override
415 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
416 ByteBuf byteBuf = readBuffer;
417 readBuffer = null;
418 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
419 readId = 0;
420
421
422 if (byteBuf != null) {
423
424 byteBuf.release();
425 }
426 return;
427 }
428 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
429 boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
430 short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
431 boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
432
433 boolean empty = socketIsEmpty(flags);
434 if (rearm) {
435
436 readId = 0;
437 }
438
439 boolean allDataRead = false;
440
441 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
442 final ChannelPipeline pipeline = pipeline();
443
444 try {
445 if (res < 0) {
446 if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
447
448 if (!bufferRing.expand()) {
449
450
451
452 pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
453 }
454
455
456
457
458 scheduleRead(allocHandle.isFirstRead());
459 return;
460 }
461
462
463
464 allocHandle.lastBytesRead(ioResult("io_uring read", res));
465 } else if (res > 0) {
466 if (useBufferRing) {
467
468
469
470
471
472 int read = res;
473 for (;;) {
474 int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
475 byteBuf = bufferRing.useBuffer(bid, read, more);
476 read -= byteBuf.readableBytes();
477 allocHandle.attemptedBytesRead(attemptedBytesRead);
478 allocHandle.lastBytesRead(byteBuf.readableBytes());
479
480 assert read >= 0;
481 if (read == 0) {
482
483
484 break;
485 }
486 allocHandle.incMessagesRead(1);
487 pipeline.fireChannelRead(byteBuf);
488 byteBuf = null;
489 bid = bufferRing.nextBid(bid);
490 if (!allocHandle.continueReading()) {
491
492 allocHandle.readComplete();
493 pipeline.fireChannelReadComplete();
494 allocHandle.reset(config());
495 }
496 }
497 } else {
498 int attemptedBytesRead = byteBuf.writableBytes();
499 byteBuf.writerIndex(byteBuf.writerIndex() + res);
500 allocHandle.attemptedBytesRead(attemptedBytesRead);
501 allocHandle.lastBytesRead(res);
502 }
503 } else {
504
505 allocHandle.lastBytesRead(-1);
506 }
507 if (allocHandle.lastBytesRead() <= 0) {
508
509 if (byteBuf != null) {
510
511 byteBuf.release();
512 byteBuf = null;
513 }
514 allDataRead = allocHandle.lastBytesRead() < 0;
515 if (allDataRead) {
516
517 shutdownInput(true);
518 }
519 allocHandle.readComplete();
520 pipeline.fireChannelReadComplete();
521 return;
522 }
523
524 allocHandle.incMessagesRead(1);
525 pipeline.fireChannelRead(byteBuf);
526 byteBuf = null;
527 scheduleNextRead(pipeline, allocHandle, rearm, empty);
528 } catch (Throwable t) {
529 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
530 }
531 }
532
533 private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
534 boolean rearm, boolean empty) {
535 if (allocHandle.continueReading() && !empty) {
536 if (rearm) {
537
538
539 scheduleRead(false);
540 }
541 } else {
542
543 allocHandle.readComplete();
544 pipeline.fireChannelReadComplete();
545 }
546 }
547
548 protected final void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
549 Throwable cause, boolean allDataRead,
550 IoUringRecvByteAllocatorHandle allocHandle) {
551 if (byteBuf != null) {
552 if (byteBuf.isReadable()) {
553 pipeline.fireChannelRead(byteBuf);
554 } else {
555 byteBuf.release();
556 }
557 }
558 allocHandle.readComplete();
559 pipeline.fireChannelReadComplete();
560 pipeline.fireExceptionCaught(cause);
561 if (allDataRead || cause instanceof IOException) {
562 shutdownInput(true);
563 }
564 }
565
566 private boolean handleWriteCompleteFileRegion(ChannelOutboundBuffer channelOutboundBuffer,
567 IoUringFileRegion fileRegion, int res, short data) {
568 try {
569 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
570 return true;
571 }
572 int result = res >= 0 ? res : ioResult("io_uring splice", res);
573 if (result == 0 && fileRegion.count() > 0) {
574 validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
575 return false;
576 }
577 int progress = fileRegion.handleResult(result, data);
578 if (progress == -1) {
579
580 channelOutboundBuffer.remove();
581 } else if (progress > 0) {
582 channelOutboundBuffer.progress(progress);
583 }
584 } catch (Throwable cause) {
585 handleWriteError(cause);
586 }
587 return true;
588 }
589
590 @Override
591 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
592 if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
593
594
595
596
597 writeId = 0;
598 writeOpCode = 0;
599 }
600 ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
601 Object current = channelOutboundBuffer.current();
602 if (current instanceof IoUringFileRegion) {
603 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
604 return handleWriteCompleteFileRegion(channelOutboundBuffer, fileRegion, res, data);
605 }
606
607 if (res >= 0) {
608 channelOutboundBuffer.removeBytes(res);
609 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
610 return true;
611 } else {
612 try {
613 if (ioResult("io_uring write", res) == 0) {
614 return false;
615 }
616 } catch (Throwable cause) {
617 handleWriteError(cause);
618 }
619 }
620 return true;
621 }
622
623 @Override
624 public void unregistered() {
625 super.unregistered();
626 assert readBuffer == null;
627 }
628 }
629
630 @Override
631 protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
632 if (readId != 0) {
633
634 assert numOutstandingReads == 1 || numOutstandingReads == -1;
635 IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, readId, readOpCode);
636 long id = registration.submit(ops);
637 assert id != 0;
638 readId = 0;
639 }
640 }
641
642 @Override
643 protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
644 if (writeId != 0) {
645
646
647 assert numOutstandingWrites == 1;
648 assert writeOpCode != 0;
649 long id = registration.submit(IoUringIoOps.newAsyncCancel((byte) 0, writeId, writeOpCode));
650 assert id != 0;
651 writeId = 0;
652 }
653 }
654
655 @Override
656 protected boolean socketIsEmpty(int flags) {
657 return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
658 }
659
660 @Override
661 boolean isPollInFirst() {
662 return bufferRing == null || !bufferRing.isUsable();
663 }
664 }