View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelException;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOutboundBuffer;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.channel.DefaultFileRegion;
28  import io.netty.channel.EventLoop;
29  import io.netty.channel.FileRegion;
30  import io.netty.channel.IoRegistration;
31  import io.netty.channel.socket.DuplexChannel;
32  import io.netty.channel.unix.IovArray;
33  import io.netty.util.internal.SystemPropertyUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.io.IOException;
38  import java.net.SocketAddress;
39  import java.nio.ByteBuffer;
40  import java.nio.channels.WritableByteChannel;
41  
42  import static io.netty.channel.unix.Errors.ioResult;
43  
44  abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
45      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
46      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
47  
48      /**
49       * Maximum bytes per chunk when converting a generic {@link FileRegion} to a {@link ByteBuf}
50       * for the io_uring async send path. Overridable via the {@code io.netty.iouring.fileRegionChunkSize}
51       * system property; capped at 16 MiB to guard against pathological configurations that would
52       * risk direct-memory OOM.
53       */
54      private static final int FILE_REGION_MAX_CHUNK_SIZE = Math.min(16 * 1024 * 1024,
55              Math.max(1, SystemPropertyUtil.getInt("io.netty.iouring.fileRegionChunkSize", 64 * 1024)));
56  
57      // Store the opCode so we know if we used WRITE or WRITEV.
58      byte writeOpCode;
59      // Keep track of the ids used for write and read so we can cancel these when needed.
60      long writeId;
61      byte readOpCode;
62      long readId;
63  
64      // The configured buffer ring if any
65      private IoUringBufferRing bufferRing;
66  
67      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
68          super(parent, socket, active);
69      }
70  
71      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
72          super(parent, socket, remote);
73      }
74  
75      @Override
76      protected final boolean isStreamSocket() {
77          return true;
78      }
79  
80      @Override
81      public ChannelMetadata metadata() {
82          return METADATA;
83      }
84  
85      @Override
86      protected AbstractUringUnsafe newUnsafe() {
87          return new IoUringStreamUnsafe();
88      }
89  
90      @Override
91      public final ChannelFuture shutdown() {
92          return shutdown(newPromise());
93      }
94  
95      @Override
96      public final ChannelFuture shutdown(final ChannelPromise promise) {
97          ChannelFuture shutdownOutputFuture = shutdownOutput();
98          if (shutdownOutputFuture.isDone()) {
99              shutdownOutputDone(shutdownOutputFuture, promise);
100         } else {
101             shutdownOutputFuture.addListener(new ChannelFutureListener() {
102                 @Override
103                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
104                     shutdownOutputDone(shutdownOutputFuture, promise);
105                 }
106             });
107         }
108         return promise;
109     }
110 
111     @Override
112     protected final void doShutdownOutput() throws Exception {
113         socket.shutdown(false, true);
114     }
115 
116     private void shutdownInput0(final ChannelPromise promise) {
117         try {
118             socket.shutdown(true, false);
119             promise.setSuccess();
120         } catch (Throwable cause) {
121             promise.setFailure(cause);
122         }
123     }
124 
125     @Override
126     public final boolean isOutputShutdown() {
127         return socket.isOutputShutdown();
128     }
129 
130     @Override
131     public final boolean isInputShutdown() {
132         return socket.isInputShutdown();
133     }
134 
135     @Override
136     public final boolean isShutdown() {
137         return socket.isShutdown();
138     }
139 
140     @Override
141     public final ChannelFuture shutdownOutput() {
142         return shutdownOutput(newPromise());
143     }
144 
145     @Override
146     public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
147         EventLoop loop = eventLoop();
148         if (loop.inEventLoop()) {
149             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
150         } else {
151             loop.execute(new Runnable() {
152                 @Override
153                 public void run() {
154                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
155                 }
156             });
157         }
158 
159         return promise;
160     }
161 
162     @Override
163     public final ChannelFuture shutdownInput() {
164         return shutdownInput(newPromise());
165     }
166 
167     @Override
168     public final ChannelFuture shutdownInput(final ChannelPromise promise) {
169         EventLoop loop = eventLoop();
170         if (loop.inEventLoop()) {
171             shutdownInput0(promise);
172         } else {
173             loop.execute(new Runnable() {
174                 @Override
175                 public void run() {
176                     shutdownInput0(promise);
177                 }
178             });
179         }
180         return promise;
181     }
182 
183     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
184         ChannelFuture shutdownInputFuture = shutdownInput();
185         if (shutdownInputFuture.isDone()) {
186             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
187         } else {
188             shutdownInputFuture.addListener(new ChannelFutureListener() {
189                 @Override
190                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
191                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
192                 }
193             });
194         }
195     }
196 
197     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
198                                      ChannelFuture shutdownInputFuture,
199                                      ChannelPromise promise) {
200         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
201         Throwable shutdownInputCause = shutdownInputFuture.cause();
202         if (shutdownOutputCause != null) {
203             if (shutdownInputCause != null) {
204                 logger.info("Exception suppressed because a previous exception occurred.",
205                              shutdownInputCause);
206             }
207             promise.setFailure(shutdownOutputCause);
208         } else if (shutdownInputCause != null) {
209             promise.setFailure(shutdownInputCause);
210         } else {
211             promise.setSuccess();
212         }
213     }
214 
215     @Override
216     protected final void doRegister(ChannelPromise promise) {
217         ChannelPromise registerPromise = this.newPromise();
218         // Ensure that the buffer group is properly set before channel::read
219         registerPromise.addListener(f -> {
220             if (f.isSuccess()) {
221                try {
222                    short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
223                    if (bgid >= 0) {
224                        final IoUringIoHandler ioUringIoHandler = registration().attachment();
225                        bufferRing = ioUringIoHandler.findBufferRing(bgid);
226                    }
227                    if (active) {
228                        // Register for POLLRDHUP if this channel is already considered active.
229                        schedulePollRdHup();
230                    }
231                } finally {
232                    promise.setSuccess();
233                }
234             } else {
235                 promise.setFailure(f.cause());
236             }
237         });
238 
239         super.doRegister(registerPromise);
240     }
241 
242     @Override
243     protected Object filterOutboundMessage(Object msg) {
244         if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
245             return new IoUringFileRegion((DefaultFileRegion) msg);
246         }
247 
248         if (msg instanceof FileRegion) {
249             // Generic FileRegion -- pass through to the write path for chunked conversion.
250             return msg;
251         }
252 
253         return super.filterOutboundMessage(msg);
254     }
255 
256     protected class IoUringStreamUnsafe extends AbstractUringUnsafe {
257 
258         private ByteBuf readBuffer;
259 
260         // Chunk buffer for generic FileRegion writes. Non-null while a send is in flight.
261         private ByteBuf fileRegionChunkBuf;
262 
263         @Override
264         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
265             assert writeId == 0;
266 
267             int fd = fd().intValue();
268             IoRegistration registration = registration();
269             IoUringIoHandler handler = registration.attachment();
270             IovArray iovArray = handler.iovArray();
271             int offset = iovArray.count();
272 
273             try {
274                 in.forEachFlushedMessage(filterWriteMultiple(iovArray));
275             } catch (Exception e) {
276                 // This should never happen, anyway fallback to single write.
277                 return scheduleWriteSingle(in.current());
278             }
279             long iovArrayAddress = iovArray.memoryAddress(offset);
280             int iovArrayLength = iovArray.count() - offset;
281             // Should not use sendmsg_zc, just use normal writev.
282             IoUringIoOps ops = IoUringIoOps.newWritev(fd, (byte) 0, 0, iovArrayAddress, iovArrayLength, nextOpsId());
283 
284             byte opCode = ops.opcode();
285             writeId = registration.submit(ops);
286             writeOpCode = opCode;
287             if (writeId == 0) {
288                 return 0;
289             }
290             return 1;
291         }
292 
293         protected ChannelOutboundBuffer.MessageProcessor filterWriteMultiple(IovArray iovArray) {
294            return iovArray;
295         }
296 
297         @Override
298         protected int scheduleWriteSingle(Object msg) {
299             assert writeId == 0;
300 
301             int fd = fd().intValue();
302             IoRegistration registration = registration();
303             final IoUringIoOps ops;
304             if (msg instanceof IoUringFileRegion) {
305                 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
306                 try {
307                     fileRegion.open();
308                 } catch (IOException e) {
309                     this.handleWriteError(e);
310                     return 0;
311                 }
312                 ops = fileRegion.splice(fd);
313             } else if (msg instanceof FileRegion) {
314                 return scheduleWriteFileRegion(fd, registration, (FileRegion) msg);
315             } else {
316                 ByteBuf buf = (ByteBuf) msg;
317                 long address = IoUring.memoryAddress(buf) + buf.readerIndex();
318                 int length = buf.readableBytes();
319                 short opsid = nextOpsId();
320 
321                 ops = IoUringIoOps.newSend(fd, (byte) 0, 0, address, length, opsid);
322             }
323             byte opCode = ops.opcode();
324             writeId = registration.submit(ops);
325             writeOpCode = opCode;
326             if (writeId == 0) {
327                 return 0;
328             }
329             return 1;
330         }
331 
332         // Read a chunk from a generic FileRegion into a direct ByteBuf and submit it as an
333         // io_uring async send. If fileRegionChunkBuf is non-null, re-submits the remaining
334         // bytes from a previous partial/failed send.
335         private int scheduleWriteFileRegion(int fd, IoRegistration registration, FileRegion region) {
336             ByteBuf buf = fileRegionChunkBuf;
337             if (buf == null) {
338                 long remaining = region.count() - region.transferred();
339                 if (remaining > 0) {
340                     int chunkSize = (int) Math.min(remaining, FILE_REGION_MAX_CHUNK_SIZE);
341                     buf = alloc().directBuffer(chunkSize);
342                     try {
343                         ByteBufWritableByteChannel ch = new ByteBufWritableByteChannel(buf);
344                         while (buf.writableBytes() > 0) {
345                             long t = region.transferTo(ch, region.transferred());
346                             if (t <= 0) {
347                                 break;
348                             }
349                         }
350                         if (buf.readableBytes() == 0) {
351                             buf.release();
352                             handleWriteError(new ChannelException(
353                                     "FileRegion.transferTo(...) produced 0 bytes (count="
354                                             + region.count() + ", transferred=" + region.transferred() + ')'));
355                             return 0;
356                         }
357                     } catch (Exception e) {
358                         buf.release();
359                         handleWriteError(e);
360                         return 0;
361                     }
362                 } else {
363                     // Empty or fully-transferred region. Submit a 0-byte send so the completion
364                     // path removes it from the outbound buffer via the normal async flow.
365                     buf = alloc().directBuffer(0);
366                 }
367                 fileRegionChunkBuf = buf;
368             }
369             long address = IoUring.memoryAddress(buf) + buf.readerIndex();
370             int length = buf.readableBytes();
371             IoUringIoOps ops = IoUringIoOps.newSend(fd, (byte) 0, 0, address, length, nextOpsId());
372             byte opCode = ops.opcode();
373             writeId = registration.submit(ops);
374             writeOpCode = opCode;
375             if (writeId == 0) {
376                 // Submission only fails when the registration is no longer valid (channel is
377                 // being deregistered). unregistered() will release fileRegionChunkBuf and the
378                 // outbound buffer will release the FileRegion, so nothing to clean up here --
379                 // mirroring the plain ByteBuf path above.
380                 return 0;
381             }
382             return 1;
383         }
384 
385         private int calculateRecvFlags(boolean first) {
386             // Depending on if this is the first read or not we will use Native.MSG_DONTWAIT.
387             // The idea is that if the socket is blocking we can do the first read in a blocking fashion
388             // and so not need to also register POLLIN. As we can not 100 % sure if reads after the first will
389             // be possible directly we schedule these with Native.MSG_DONTWAIT. This allows us to still be
390             // able to signal the fireChannelReadComplete() in a timely manner and be consistent with other
391             // transports.
392             if (first) {
393                 return 0;
394             }
395             return Native.MSG_DONTWAIT;
396         }
397 
398         private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
399             // Depending on if socketIsEmpty is true we will arm the poll upfront and skip the initial transfer
400             // attempt.
401             // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#socket-state
402             if (first) {
403                 // IORING_RECVSEND_POLL_FIRST and IORING_CQE_F_SOCK_NONEMPTY were added in the same release (5.19).
404                 // We need to check if it's supported as otherwise providing these would result in an -EINVAL.
405                 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
406                         Native.IORING_RECVSEND_POLL_FIRST : 0;
407             }
408             return 0;
409         }
410 
411         @Override
412         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
413             assert readBuffer == null;
414             assert readId == 0 : readId;
415             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
416 
417             if (bufferRing != null && bufferRing.isUsable()) {
418                 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
419             }
420 
421             // We either have no buffer ring configured or we force a recv without using a buffer ring.
422             ByteBuf byteBuf = allocHandle.allocate(alloc());
423             try {
424                 int fd = fd().intValue();
425                 IoRegistration registration = registration();
426                 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
427                 int recvFlags = calculateRecvFlags(first);
428 
429                 IoUringIoOps ops = IoUringIoOps.newRecv(fd, (byte) 0, ioPrio, recvFlags,
430                         IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
431                 readId = registration.submit(ops);
432                 readOpCode = Native.IORING_OP_RECV;
433                 if (readId == 0) {
434                     return 0;
435                 }
436                 readBuffer = byteBuf;
437                 byteBuf = null;
438                 return 1;
439             } finally {
440                 if (byteBuf != null) {
441                     byteBuf.release();
442                 }
443             }
444         }
445 
446         private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
447             short bgId = bufferRing.bufferGroupId();
448             try {
449                 boolean multishot = IoUring.isRecvMultishotEnabled();
450                 byte flags = (byte) Native.IOSQE_BUFFER_SELECT;
451                 short ioPrio;
452                 final int recvFlags;
453                 if (multishot) {
454                     ioPrio = Native.IORING_RECV_MULTISHOT;
455                     recvFlags = 0;
456                 } else {
457                     // We should only use the calculate*() methods if this is not a multishot recv, as otherwise
458                     // the would be applied until the multishot will be re-armed.
459                     ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
460                     recvFlags = calculateRecvFlags(first);
461                 }
462                 if (IoUring.isRecvsendBundleEnabled()) {
463                     // See https://github.com/axboe/liburing/wiki/
464                     // What's-new-with-io_uring-in-6.10#add-support-for-sendrecv-bundles
465                     ioPrio |= Native.IORING_RECVSEND_BUNDLE;
466                 }
467                 IoRegistration registration = registration();
468                 int fd = fd().intValue();
469                 IoUringIoOps ops = IoUringIoOps.newRecv(
470                         fd, flags, ioPrio, recvFlags, 0,
471                         0, nextOpsId(), bgId
472                 );
473                 readId = registration.submit(ops);
474                 readOpCode = Native.IORING_OP_RECV;
475                 if (readId == 0) {
476                     return 0;
477                 }
478                 if (multishot) {
479                     // Return -1 to signal we used multishot and so expect multiple recvComplete(...) calls.
480                     return -1;
481                 }
482                 return 1;
483             } catch (IllegalArgumentException illegalArgumentException) {
484                 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
485                 return 0;
486             }
487         }
488 
489         @Override
490         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
491             ByteBuf byteBuf = readBuffer;
492             readBuffer = null;
493             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
494                 readId = 0;
495                 // In case of cancellation we should reset the last used buffer ring to null as we will select a new one
496                 // when calling scheduleRead(..)
497                 if (byteBuf != null) {
498                     //recv without buffer ring
499                     byteBuf.release();
500                 }
501                 return;
502             }
503             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
504             boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
505             short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
506             boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
507 
508             boolean empty = socketIsEmpty(flags);
509             if (rearm) {
510                 // Only reset if we don't use multi-shot or we need to re-arm because the multi-shot was cancelled.
511                 readId = 0;
512             }
513 
514             boolean allDataRead = false;
515 
516             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
517             final ChannelPipeline pipeline = pipeline();
518 
519             try {
520                 if (res < 0) {
521                     if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
522                         // try to expand the buffer ring by adding more buffers to it if there is any space left.
523                         if (!bufferRing.expand()) {
524                             // We couldn't expand the ring anymore so notify the user that we did run out of buffers
525                             // without the ability to expand it.
526                             // If this happens to often the user should most likely increase the buffer ring size.
527                             pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
528                         }
529 
530                         // Let's trigger a read again without consulting the RecvByteBufAllocator.Handle as
531                         // we can't count this as a "real" read operation.
532                         // Because of how our BufferRing works we should have it filled again.
533                         scheduleRead(allocHandle.isFirstRead());
534                         return;
535                     }
536 
537                     // If res is negative we should pass it to ioResult(...) which will either throw
538                     // or convert it to 0 if we could not read because the socket was not readable.
539                     allocHandle.lastBytesRead(ioResult("io_uring read", res));
540                 } else if (res > 0) {
541                     if (useBufferRing) {
542                         // If RECVSEND_BUNDLE is used we need to do a bit more work here.
543                         // In this case we might need to obtain multiple buffers out of the buffer ring as
544                         // multiple of them might have been filled for one recv operation.
545                         // See https://github.com/axboe/liburing/wiki/
546                         // What's-new-with-io_uring-in-6.10#add-support-for-sendrecv-bundles
547                         int read = res;
548                         for (;;) {
549                             int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
550                             byteBuf = bufferRing.useBuffer(bid, read, more);
551                             read -= byteBuf.readableBytes();
552                             allocHandle.attemptedBytesRead(attemptedBytesRead);
553                             allocHandle.lastBytesRead(byteBuf.readableBytes());
554 
555                             assert read >= 0;
556                             if (read == 0) {
557                                 // Just break here, we will handle the byteBuf below and also fill the bufferRing
558                                 // if needed later.
559                                 break;
560                             }
561                             allocHandle.incMessagesRead(1);
562                             pipeline.fireChannelRead(byteBuf);
563                             byteBuf = null;
564                             bid = bufferRing.nextBid(bid);
565                             if (!allocHandle.continueReading()) {
566                                 // We should call fireChannelReadComplete() to mimic a normal read loop.
567                                 allocHandle.readComplete();
568                                 pipeline.fireChannelReadComplete();
569                                 allocHandle.reset(config());
570                             }
571                         }
572                     } else {
573                         int attemptedBytesRead = byteBuf.writableBytes();
574                         byteBuf.writerIndex(byteBuf.writerIndex() + res);
575                         allocHandle.attemptedBytesRead(attemptedBytesRead);
576                         allocHandle.lastBytesRead(res);
577                     }
578                 } else {
579                     // EOF which we signal with -1.
580                     allocHandle.lastBytesRead(-1);
581                 }
582                 if (allocHandle.lastBytesRead() <= 0) {
583                     // byteBuf might be null if we used a buffer ring.
584                     if (byteBuf != null) {
585                         // nothing was read, release the buffer.
586                         byteBuf.release();
587                         byteBuf = null;
588                     }
589                     allDataRead = allocHandle.lastBytesRead() < 0;
590                     if (allDataRead) {
591                         // There is nothing left to read as we received an EOF.
592                         shutdownInput(true);
593                     }
594                     allocHandle.readComplete();
595                     pipeline.fireChannelReadComplete();
596                     return;
597                 }
598 
599                 allocHandle.incMessagesRead(1);
600                 pipeline.fireChannelRead(byteBuf);
601                 byteBuf = null;
602                 scheduleNextRead(pipeline, allocHandle, rearm, empty);
603             } catch (Throwable t) {
604                 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
605             }
606         }
607 
608         private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
609                                       boolean rearm, boolean empty) {
610             if (allocHandle.continueReading() && !empty) {
611                 if (rearm) {
612                     // We only should schedule another read if we need to rearm.
613                     // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#multi-shot
614                     scheduleRead(false);
615                 }
616             } else {
617                 // We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
618                 allocHandle.readComplete();
619                 pipeline.fireChannelReadComplete();
620             }
621         }
622 
623         protected final void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
624                                          Throwable cause, boolean allDataRead,
625                                          IoUringRecvByteAllocatorHandle allocHandle) {
626             if (byteBuf != null) {
627                 if (byteBuf.isReadable()) {
628                     pipeline.fireChannelRead(byteBuf);
629                 } else {
630                     byteBuf.release();
631                 }
632             }
633             allocHandle.readComplete();
634             pipeline.fireChannelReadComplete();
635             pipeline.fireExceptionCaught(cause);
636             if (allDataRead || cause instanceof IOException) {
637                 shutdownInput(true);
638             }
639         }
640 
641         private boolean handleWriteCompleteFileRegion(ChannelOutboundBuffer channelOutboundBuffer,
642                                                       IoUringFileRegion fileRegion, int res, short data) {
643             try {
644                 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
645                     return true;
646                 }
647                 int result = res >= 0 ? res : ioResult("io_uring splice", res);
648                 if (result == 0 && fileRegion.count() > 0) {
649                     validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
650                     return false;
651                 }
652                 int progress = fileRegion.handleResult(result, data);
653                 if (progress == -1) {
654                     // Done with writing
655                     channelOutboundBuffer.remove();
656                 } else if (progress > 0) {
657                     channelOutboundBuffer.progress(progress);
658                 }
659             } catch (Throwable cause) {
660                 handleWriteError(cause);
661             }
662             return true;
663         }
664 
665         @Override
666         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
667             if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
668                 // We only want to reset these if IORING_CQE_F_NOTIF is not set.
669                 // If it's set we know this is only an extra notification for a write but we already handled
670                 // the write completions before.
671                 // See https://man7.org/linux/man-pages/man2/io_uring_enter.2.html section: IORING_OP_SEND_ZC
672                 writeId = 0;
673                 writeOpCode = 0;
674             }
675             ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
676             Object current = channelOutboundBuffer.current();
677             if (current instanceof IoUringFileRegion) {
678                 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
679                 return handleWriteCompleteFileRegion(channelOutboundBuffer, fileRegion, res, data);
680             }
681 
682             if (current instanceof FileRegion) {
683                 return handleWriteCompleteGenericFileRegion(
684                         channelOutboundBuffer, (FileRegion) current, res);
685             }
686 
687             if (res >= 0) {
688                 channelOutboundBuffer.removeBytes(res);
689             } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
690                 return true;
691             } else {
692                 try {
693                     if (ioResult("io_uring write", res) == 0) {
694                         return false;
695                     }
696                 } catch (Throwable cause) {
697                     handleWriteError(cause);
698                 }
699             }
700             return true;
701         }
702 
703         // Returns true when the completion can be treated as "written all" for this SQE
704         // (the framework may still schedule further writes from the outbound buffer); returns
705         // false to signal the framework that POLLOUT should be armed so the chunk buffer is
706         // resubmitted once the socket becomes writable again.
707         private boolean handleWriteCompleteGenericFileRegion(
708                 ChannelOutboundBuffer channelOutboundBuffer, FileRegion region, int res) {
709             try {
710                 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
711                     releaseFileRegionChunkBuf();
712                     return true;
713                 }
714                 if (res >= 0) {
715                     ByteBuf buf = fileRegionChunkBuf;
716                     assert buf != null;
717                     buf.skipBytes(res);
718                     channelOutboundBuffer.progress(res);
719                     if (!buf.isReadable()) {
720                         // Chunk fully sent.
721                         releaseFileRegionChunkBuf();
722                         if (region.transferred() >= region.count()) {
723                             channelOutboundBuffer.remove();
724                         }
725                     } else {
726                         // Partial send -- schedule POLLOUT to re-send the remainder.
727                         return false;
728                     }
729                 } else {
730                     // Keep the chunk buffer -- on retryable errors (EAGAIN) ioResult returns 0
731                     // and scheduleWriteFileRegion() will re-submit the same fileRegionChunkBuf
732                     // once POLLOUT fires. On a non-retryable error ioResult throws, and the
733                     // outer catch releases the buffer.
734                     if (ioResult("io_uring write", res) == 0) {
735                         return false;
736                     }
737                 }
738             } catch (Throwable cause) {
739                 releaseFileRegionChunkBuf();
740                 handleWriteError(cause);
741             }
742             return true;
743         }
744 
745         private void releaseFileRegionChunkBuf() {
746             if (fileRegionChunkBuf != null) {
747                 fileRegionChunkBuf.release();
748                 fileRegionChunkBuf = null;
749             }
750         }
751 
752         @Override
753         public void unregistered() {
754             super.unregistered();
755             assert readBuffer == null;
756             releaseFileRegionChunkBuf();
757         }
758     }
759 
760     @Override
761     protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
762         if (readId != 0) {
763             // Let's try to cancel outstanding reads as these might be submitted and waiting for data (via fastpoll).
764             assert numOutstandingReads == 1 || numOutstandingReads == -1;
765             IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, readId, readOpCode);
766             long id = registration.submit(ops);
767             assert id != 0;
768             readId = 0;
769         }
770     }
771 
772     @Override
773     protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
774         if (writeId != 0) {
775             // Let's try to cancel outstanding writes as these might be submitted and waiting to finish writing
776             // (via fastpoll).
777             assert numOutstandingWrites == 1;
778             assert writeOpCode != 0;
779             long id = registration.submit(IoUringIoOps.newAsyncCancel((byte) 0, writeId, writeOpCode));
780             assert id != 0;
781             writeId = 0;
782         }
783     }
784 
785     @Override
786     protected boolean socketIsEmpty(int flags) {
787         return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
788     }
789 
790     @Override
791     boolean isPollInFirst() {
792         return bufferRing == null || !bufferRing.isUsable();
793     }
794 
795     /**
796      * A {@link WritableByteChannel} backed by a {@link ByteBuf}.
797      * Writes are capped to {@link ByteBuf#writableBytes()} to prevent overflow
798      * when {@link FileRegion#transferTo} writes more than the chunk size.
799      */
800     private static final class ByteBufWritableByteChannel implements WritableByteChannel {
801         private final ByteBuf buf;
802 
803         ByteBufWritableByteChannel(ByteBuf buf) {
804             this.buf = buf;
805         }
806 
807         @Override
808         public int write(ByteBuffer src) {
809             int toWrite = Math.min(src.remaining(), buf.writableBytes());
810             if (toWrite == 0) {
811                 return 0;
812             }
813             if (toWrite < src.remaining()) {
814                 int oldLimit = src.limit();
815                 src.limit(src.position() + toWrite);
816                 buf.writeBytes(src);
817                 src.limit(oldLimit);
818                 return toWrite;
819             }
820             buf.writeBytes(src);
821             return toWrite;
822         }
823 
824         @Override
825         public boolean isOpen() {
826             return true;
827         }
828 
829         @Override
830         public void close() {
831             // NOOP
832         }
833     }
834 }