1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.DefaultFileRegion;
27 import io.netty.channel.EventLoop;
28 import io.netty.channel.IoRegistration;
29 import io.netty.channel.socket.DuplexChannel;
30 import io.netty.channel.unix.IovArray;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.io.IOException;
35 import java.net.SocketAddress;
36
37 import static io.netty.channel.unix.Errors.ioResult;
38
39 abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
40 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
41 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
42
43
44 byte writeOpCode;
45
46 long writeId;
47 byte readOpCode;
48 long readId;
49
50
51 private IoUringBufferRing bufferRing;
52
53 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
54 super(parent, socket, active);
55 }
56
57 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
58 super(parent, socket, remote);
59 }
60
61 @Override
62 public ChannelMetadata metadata() {
63 return METADATA;
64 }
65
66 @Override
67 protected AbstractUringUnsafe newUnsafe() {
68 return new IoUringStreamUnsafe();
69 }
70
71 @Override
72 public final ChannelFuture shutdown() {
73 return shutdown(newPromise());
74 }
75
76 @Override
77 public final ChannelFuture shutdown(final ChannelPromise promise) {
78 ChannelFuture shutdownOutputFuture = shutdownOutput();
79 if (shutdownOutputFuture.isDone()) {
80 shutdownOutputDone(shutdownOutputFuture, promise);
81 } else {
82 shutdownOutputFuture.addListener(new ChannelFutureListener() {
83 @Override
84 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
85 shutdownOutputDone(shutdownOutputFuture, promise);
86 }
87 });
88 }
89 return promise;
90 }
91
92 @Override
93 protected final void doShutdownOutput() throws Exception {
94 socket.shutdown(false, true);
95 }
96
97 private void shutdownInput0(final ChannelPromise promise) {
98 try {
99 socket.shutdown(true, false);
100 promise.setSuccess();
101 } catch (Throwable cause) {
102 promise.setFailure(cause);
103 }
104 }
105
106 @Override
107 public final boolean isOutputShutdown() {
108 return socket.isOutputShutdown();
109 }
110
111 @Override
112 public final boolean isInputShutdown() {
113 return socket.isInputShutdown();
114 }
115
116 @Override
117 public final boolean isShutdown() {
118 return socket.isShutdown();
119 }
120
121 @Override
122 public final ChannelFuture shutdownOutput() {
123 return shutdownOutput(newPromise());
124 }
125
126 @Override
127 public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
128 EventLoop loop = eventLoop();
129 if (loop.inEventLoop()) {
130 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
131 } else {
132 loop.execute(new Runnable() {
133 @Override
134 public void run() {
135 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
136 }
137 });
138 }
139
140 return promise;
141 }
142
143 @Override
144 public final ChannelFuture shutdownInput() {
145 return shutdownInput(newPromise());
146 }
147
148 @Override
149 public final ChannelFuture shutdownInput(final ChannelPromise promise) {
150 EventLoop loop = eventLoop();
151 if (loop.inEventLoop()) {
152 shutdownInput0(promise);
153 } else {
154 loop.execute(new Runnable() {
155 @Override
156 public void run() {
157 shutdownInput0(promise);
158 }
159 });
160 }
161 return promise;
162 }
163
164 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
165 ChannelFuture shutdownInputFuture = shutdownInput();
166 if (shutdownInputFuture.isDone()) {
167 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
168 } else {
169 shutdownInputFuture.addListener(new ChannelFutureListener() {
170 @Override
171 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
172 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
173 }
174 });
175 }
176 }
177
178 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
179 ChannelFuture shutdownInputFuture,
180 ChannelPromise promise) {
181 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
182 Throwable shutdownInputCause = shutdownInputFuture.cause();
183 if (shutdownOutputCause != null) {
184 if (shutdownInputCause != null) {
185 logger.info("Exception suppressed because a previous exception occurred.",
186 shutdownInputCause);
187 }
188 promise.setFailure(shutdownOutputCause);
189 } else if (shutdownInputCause != null) {
190 promise.setFailure(shutdownInputCause);
191 } else {
192 promise.setSuccess();
193 }
194 }
195
196 @Override
197 protected final void doRegister(ChannelPromise promise) {
198 ChannelPromise registerPromise = this.newPromise();
199
200 registerPromise.addListener(f -> {
201 if (f.isSuccess()) {
202 try {
203 short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
204 if (bgid >= 0) {
205 final IoUringIoHandler ioUringIoHandler = registration().attachment();
206 bufferRing = ioUringIoHandler.findBufferRing(bgid);
207 }
208 if (active) {
209
210 schedulePollRdHup();
211 }
212 } finally {
213 promise.setSuccess();
214 }
215 } else {
216 promise.setFailure(f.cause());
217 }
218 });
219
220 super.doRegister(registerPromise);
221 }
222
223 @Override
224 protected Object filterOutboundMessage(Object msg) {
225
226
227 if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
228 return new IoUringFileRegion((DefaultFileRegion) msg);
229 }
230
231 return super.filterOutboundMessage(msg);
232 }
233
234 protected class IoUringStreamUnsafe extends AbstractUringUnsafe {
235
236 private ByteBuf readBuffer;
237
238 @Override
239 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
240 assert writeId == 0;
241
242 int fd = fd().intValue();
243 IoRegistration registration = registration();
244 IoUringIoHandler handler = registration.attachment();
245 IovArray iovArray = handler.iovArray();
246 int offset = iovArray.count();
247
248 try {
249 in.forEachFlushedMessage(filterWriteMultiple(iovArray));
250 } catch (Exception e) {
251
252 return scheduleWriteSingle(in.current());
253 }
254 long iovArrayAddress = iovArray.memoryAddress(offset);
255 int iovArrayLength = iovArray.count() - offset;
256
257 IoUringIoOps ops = IoUringIoOps.newWritev(fd, (byte) 0, 0, iovArrayAddress, iovArrayLength, nextOpsId());
258
259 byte opCode = ops.opcode();
260 writeId = registration.submit(ops);
261 writeOpCode = opCode;
262 if (writeId == 0) {
263 return 0;
264 }
265 return 1;
266 }
267
268 protected ChannelOutboundBuffer.MessageProcessor filterWriteMultiple(IovArray iovArray) {
269 return iovArray;
270 }
271
272 @Override
273 protected int scheduleWriteSingle(Object msg) {
274 assert writeId == 0;
275
276 int fd = fd().intValue();
277 IoRegistration registration = registration();
278 final IoUringIoOps ops;
279 if (msg instanceof IoUringFileRegion) {
280 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
281 try {
282 fileRegion.open();
283 } catch (IOException e) {
284 this.handleWriteError(e);
285 return 0;
286 }
287 ops = fileRegion.splice(fd);
288 } else {
289 ByteBuf buf = (ByteBuf) msg;
290 long address = IoUring.memoryAddress(buf) + buf.readerIndex();
291 int length = buf.readableBytes();
292 short opsid = nextOpsId();
293
294 ops = IoUringIoOps.newSend(fd, (byte) 0, 0, address, length, opsid);
295 }
296 byte opCode = ops.opcode();
297 writeId = registration.submit(ops);
298 writeOpCode = opCode;
299 if (writeId == 0) {
300 return 0;
301 }
302 return 1;
303 }
304
305 private int calculateRecvFlags(boolean first) {
306
307
308
309
310
311
312 if (first) {
313 return 0;
314 }
315 return Native.MSG_DONTWAIT;
316 }
317
318 private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
319
320
321
322 if (first) {
323
324
325 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
326 Native.IORING_RECVSEND_POLL_FIRST : 0;
327 }
328 return 0;
329 }
330
331 @Override
332 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
333 assert readBuffer == null;
334 assert readId == 0 : readId;
335 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
336
337 if (bufferRing != null && bufferRing.isUsable()) {
338 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
339 }
340
341
342 ByteBuf byteBuf = allocHandle.allocate(alloc());
343 try {
344 int fd = fd().intValue();
345 IoRegistration registration = registration();
346 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
347 int recvFlags = calculateRecvFlags(first);
348
349 IoUringIoOps ops = IoUringIoOps.newRecv(fd, (byte) 0, ioPrio, recvFlags,
350 IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
351 readId = registration.submit(ops);
352 readOpCode = Native.IORING_OP_RECV;
353 if (readId == 0) {
354 return 0;
355 }
356 readBuffer = byteBuf;
357 byteBuf = null;
358 return 1;
359 } finally {
360 if (byteBuf != null) {
361 byteBuf.release();
362 }
363 }
364 }
365
366 private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
367 short bgId = bufferRing.bufferGroupId();
368 try {
369 boolean multishot = IoUring.isRecvMultishotEnabled();
370 byte flags = (byte) Native.IOSQE_BUFFER_SELECT;
371 short ioPrio;
372 final int recvFlags;
373 if (multishot) {
374 ioPrio = Native.IORING_RECV_MULTISHOT;
375 recvFlags = 0;
376 } else {
377
378
379 ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
380 recvFlags = calculateRecvFlags(first);
381 }
382 if (IoUring.isRecvsendBundleEnabled()) {
383
384
385 ioPrio |= Native.IORING_RECVSEND_BUNDLE;
386 }
387 IoRegistration registration = registration();
388 int fd = fd().intValue();
389 IoUringIoOps ops = IoUringIoOps.newRecv(
390 fd, flags, ioPrio, recvFlags, 0,
391 0, nextOpsId(), bgId
392 );
393 readId = registration.submit(ops);
394 readOpCode = Native.IORING_OP_RECV;
395 if (readId == 0) {
396 return 0;
397 }
398 if (multishot) {
399
400 return -1;
401 }
402 return 1;
403 } catch (IllegalArgumentException illegalArgumentException) {
404 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
405 return 0;
406 }
407 }
408
409 @Override
410 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
411 ByteBuf byteBuf = readBuffer;
412 readBuffer = null;
413 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
414 readId = 0;
415
416
417 if (byteBuf != null) {
418
419 byteBuf.release();
420 }
421 return;
422 }
423 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
424 boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
425 short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
426 boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
427
428 boolean empty = socketIsEmpty(flags);
429 if (rearm) {
430
431 readId = 0;
432 }
433
434 boolean allDataRead = false;
435
436 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
437 final ChannelPipeline pipeline = pipeline();
438
439 try {
440 if (res < 0) {
441 if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
442
443 if (!bufferRing.expand()) {
444
445
446
447 pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
448 }
449
450
451
452
453 scheduleRead(allocHandle.isFirstRead());
454 return;
455 }
456
457
458
459 allocHandle.lastBytesRead(ioResult("io_uring read", res));
460 } else if (res > 0) {
461 if (useBufferRing) {
462
463
464
465
466
467 int read = res;
468 for (;;) {
469 int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
470 byteBuf = bufferRing.useBuffer(bid, read, more);
471 read -= byteBuf.readableBytes();
472 allocHandle.attemptedBytesRead(attemptedBytesRead);
473 allocHandle.lastBytesRead(byteBuf.readableBytes());
474
475 assert read >= 0;
476 if (read == 0) {
477
478
479 break;
480 }
481 allocHandle.incMessagesRead(1);
482 pipeline.fireChannelRead(byteBuf);
483 byteBuf = null;
484 bid = bufferRing.nextBid(bid);
485 if (!allocHandle.continueReading()) {
486
487 allocHandle.readComplete();
488 pipeline.fireChannelReadComplete();
489 allocHandle.reset(config());
490 }
491 }
492 } else {
493 int attemptedBytesRead = byteBuf.writableBytes();
494 byteBuf.writerIndex(byteBuf.writerIndex() + res);
495 allocHandle.attemptedBytesRead(attemptedBytesRead);
496 allocHandle.lastBytesRead(res);
497 }
498 } else {
499
500 allocHandle.lastBytesRead(-1);
501 }
502 if (allocHandle.lastBytesRead() <= 0) {
503
504 if (byteBuf != null) {
505
506 byteBuf.release();
507 byteBuf = null;
508 }
509 allDataRead = allocHandle.lastBytesRead() < 0;
510 if (allDataRead) {
511
512 shutdownInput(true);
513 }
514 allocHandle.readComplete();
515 pipeline.fireChannelReadComplete();
516 return;
517 }
518
519 allocHandle.incMessagesRead(1);
520 pipeline.fireChannelRead(byteBuf);
521 byteBuf = null;
522 scheduleNextRead(pipeline, allocHandle, rearm, empty);
523 } catch (Throwable t) {
524 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
525 }
526 }
527
528 private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
529 boolean rearm, boolean empty) {
530 if (allocHandle.continueReading() && !empty) {
531 if (rearm) {
532
533
534 scheduleRead(false);
535 }
536 } else {
537
538 allocHandle.readComplete();
539 pipeline.fireChannelReadComplete();
540 }
541 }
542
543 protected final void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
544 Throwable cause, boolean allDataRead,
545 IoUringRecvByteAllocatorHandle allocHandle) {
546 if (byteBuf != null) {
547 if (byteBuf.isReadable()) {
548 pipeline.fireChannelRead(byteBuf);
549 } else {
550 byteBuf.release();
551 }
552 }
553 allocHandle.readComplete();
554 pipeline.fireChannelReadComplete();
555 pipeline.fireExceptionCaught(cause);
556 if (allDataRead || cause instanceof IOException) {
557 shutdownInput(true);
558 }
559 }
560
561 private boolean handleWriteCompleteFileRegion(ChannelOutboundBuffer channelOutboundBuffer,
562 IoUringFileRegion fileRegion, int res, short data) {
563 try {
564 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
565 return true;
566 }
567 int result = res >= 0 ? res : ioResult("io_uring splice", res);
568 if (result == 0 && fileRegion.count() > 0) {
569 validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
570 return false;
571 }
572 int progress = fileRegion.handleResult(result, data);
573 if (progress == -1) {
574
575 channelOutboundBuffer.remove();
576 } else if (progress > 0) {
577 channelOutboundBuffer.progress(progress);
578 }
579 } catch (Throwable cause) {
580 handleWriteError(cause);
581 }
582 return true;
583 }
584
585 @Override
586 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
587 if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
588
589
590
591
592 writeId = 0;
593 writeOpCode = 0;
594 }
595 ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
596 Object current = channelOutboundBuffer.current();
597 if (current instanceof IoUringFileRegion) {
598 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
599 return handleWriteCompleteFileRegion(channelOutboundBuffer, fileRegion, res, data);
600 }
601
602 if (res >= 0) {
603 channelOutboundBuffer.removeBytes(res);
604 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
605 return true;
606 } else {
607 try {
608 if (ioResult("io_uring write", res) == 0) {
609 return false;
610 }
611 } catch (Throwable cause) {
612 handleWriteError(cause);
613 }
614 }
615 return true;
616 }
617
618 @Override
619 public void unregistered() {
620 super.unregistered();
621 assert readBuffer == null;
622 }
623 }
624
625 @Override
626 protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
627 if (readId != 0) {
628
629 assert numOutstandingReads == 1 || numOutstandingReads == -1;
630 IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, readId, readOpCode);
631 long id = registration.submit(ops);
632 assert id != 0;
633 readId = 0;
634 }
635 }
636
637 @Override
638 protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
639 if (writeId != 0) {
640
641
642 assert numOutstandingWrites == 1;
643 assert writeOpCode != 0;
644 long id = registration.submit(IoUringIoOps.newAsyncCancel((byte) 0, writeId, writeOpCode));
645 assert id != 0;
646 writeId = 0;
647 }
648 }
649
650 @Override
651 protected boolean socketIsEmpty(int flags) {
652 return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
653 }
654
655 @Override
656 boolean isPollInFirst() {
657 return bufferRing == null || !bufferRing.isUsable();
658 }
659 }