View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelException;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelMetadata;
24  import io.netty.channel.ChannelOutboundBuffer;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.channel.DefaultFileRegion;
28  import io.netty.channel.EventLoop;
29  import io.netty.channel.FileRegion;
30  import io.netty.channel.IoRegistration;
31  import io.netty.channel.socket.DuplexChannel;
32  import io.netty.channel.unix.IovArray;
33  import io.netty.util.internal.SystemPropertyUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.io.IOException;
38  import java.net.SocketAddress;
39  import java.nio.ByteBuffer;
40  import java.nio.channels.WritableByteChannel;
41  
42  import static io.netty.channel.unix.Errors.ioResult;
43  
44  abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
45      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
46      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
47  
48      /**
49       * Maximum bytes per chunk when converting a generic {@link FileRegion} to a {@link ByteBuf}
50       * for the io_uring async send path. Overridable via the {@code io.netty.iouring.fileRegionChunkSize}
51       * system property; capped at 16 MiB to guard against pathological configurations that would
52       * risk direct-memory OOM.
53       */
54      private static final int FILE_REGION_MAX_CHUNK_SIZE = Math.min(16 * 1024 * 1024,
55              Math.max(1, SystemPropertyUtil.getInt("io.netty.iouring.fileRegionChunkSize", 64 * 1024)));
56  
57      // Store the opCode so we know if we used WRITE or WRITEV.
58      byte writeOpCode;
59      // Keep track of the ids used for write and read so we can cancel these when needed.
60      long writeId;
61      byte readOpCode;
62      long readId;
63  
64      // The configured buffer ring if any
65      private IoUringBufferRing bufferRing;
66  
67      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
68          super(parent, socket, active);
69      }
70  
71      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
72          super(parent, socket, remote);
73      }
74  
75      @Override
76      protected final boolean isStreamSocket() {
77          return true;
78      }
79  
80      @Override
81      public ChannelMetadata metadata() {
82          return METADATA;
83      }
84  
85      @Override
86      protected AbstractUringUnsafe newUnsafe() {
87          return new IoUringStreamUnsafe();
88      }
89  
90      @Override
91      public final ChannelFuture shutdown() {
92          return shutdown(newPromise());
93      }
94  
95      @Override
96      public final ChannelFuture shutdown(final ChannelPromise promise) {
97          ChannelFuture shutdownOutputFuture = shutdownOutput();
98          if (shutdownOutputFuture.isDone()) {
99              shutdownOutputDone(shutdownOutputFuture, promise);
100         } else {
101             shutdownOutputFuture.addListener(new ChannelFutureListener() {
102                 @Override
103                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
104                     shutdownOutputDone(shutdownOutputFuture, promise);
105                 }
106             });
107         }
108         return promise;
109     }
110 
111     @Override
112     protected final void doShutdownOutput() throws Exception {
113         socket.shutdown(false, true);
114     }
115 
116     private void shutdownInput0(final ChannelPromise promise) {
117         try {
118             socket.shutdown(true, false);
119             promise.setSuccess();
120         } catch (Throwable cause) {
121             promise.setFailure(cause);
122         }
123     }
124 
125     @Override
126     public final boolean isOutputShutdown() {
127         return socket.isOutputShutdown();
128     }
129 
130     @Override
131     public final boolean isInputShutdown() {
132         return socket.isInputShutdown();
133     }
134 
135     @Override
136     public final boolean isShutdown() {
137         return socket.isShutdown();
138     }
139 
140     @Override
141     public final ChannelFuture shutdownOutput() {
142         return shutdownOutput(newPromise());
143     }
144 
145     @Override
146     public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
147         EventLoop loop = eventLoop();
148         if (loop.inEventLoop()) {
149             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
150         } else {
151             loop.execute(new Runnable() {
152                 @Override
153                 public void run() {
154                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
155                 }
156             });
157         }
158 
159         return promise;
160     }
161 
162     @Override
163     public final ChannelFuture shutdownInput() {
164         return shutdownInput(newPromise());
165     }
166 
167     @Override
168     public final ChannelFuture shutdownInput(final ChannelPromise promise) {
169         EventLoop loop = eventLoop();
170         if (loop.inEventLoop()) {
171             shutdownInput0(promise);
172         } else {
173             loop.execute(new Runnable() {
174                 @Override
175                 public void run() {
176                     shutdownInput0(promise);
177                 }
178             });
179         }
180         return promise;
181     }
182 
183     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
184         ChannelFuture shutdownInputFuture = shutdownInput();
185         if (shutdownInputFuture.isDone()) {
186             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
187         } else {
188             shutdownInputFuture.addListener(new ChannelFutureListener() {
189                 @Override
190                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
191                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
192                 }
193             });
194         }
195     }
196 
197     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
198                                      ChannelFuture shutdownInputFuture,
199                                      ChannelPromise promise) {
200         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
201         Throwable shutdownInputCause = shutdownInputFuture.cause();
202         if (shutdownOutputCause != null) {
203             if (shutdownInputCause != null) {
204                 logger.info("Exception suppressed because a previous exception occurred.",
205                              shutdownInputCause);
206             }
207             promise.setFailure(shutdownOutputCause);
208         } else if (shutdownInputCause != null) {
209             promise.setFailure(shutdownInputCause);
210         } else {
211             promise.setSuccess();
212         }
213     }
214 
215     @Override
216     protected final void doRegister(ChannelPromise promise) {
217         ChannelPromise registerPromise = this.newPromise();
218         // Ensure that the buffer group is properly set before channel::read
219         registerPromise.addListener(f -> {
220             if (f.isSuccess()) {
221                try {
222                    short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
223                    if (bgid >= 0) {
224                        final IoUringIoHandler ioUringIoHandler = registration().attachment();
225                        bufferRing = ioUringIoHandler.findBufferRing(bgid);
226                    }
227                    if (active) {
228                        // Register for POLLRDHUP if this channel is already considered active.
229                        schedulePollRdHup();
230                    }
231                } finally {
232                    promise.setSuccess();
233                }
234             } else {
235                 promise.setFailure(f.cause());
236             }
237         });
238 
239         super.doRegister(registerPromise);
240     }
241 
242     @Override
243     protected Object filterOutboundMessage(Object msg) {
244         if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
245             return new IoUringFileRegion((DefaultFileRegion) msg);
246         }
247 
248         if (msg instanceof FileRegion) {
249             // Generic FileRegion -- pass through to the write path for chunked conversion.
250             return msg;
251         }
252 
253         return super.filterOutboundMessage(msg);
254     }
255 
256     protected class IoUringStreamUnsafe extends AbstractUringUnsafe {
257 
258         private ByteBuf readBuffer;
259 
260         // Chunk buffer for generic FileRegion writes. Non-null while a send is in flight.
261         private ByteBuf fileRegionChunkBuf;
262 
263         @Override
264         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
265             assert writeId == 0;
266 
267             int fd = fd().intValue();
268             IoRegistration registration = registration();
269             IoUringIoHandler handler = registration.attachment();
270             IovArray iovArray = handler.iovArray();
271             int offset = iovArray.count();
272 
273             try {
274                 in.forEachFlushedMessage(filterWriteMultiple(iovArray));
275             } catch (Exception e) {
276                 // This should never happen, anyway fallback to single write.
277                 return scheduleWriteSingle(in.current());
278             }
279             long iovArrayAddress = iovArray.memoryAddress(offset);
280             int iovArrayLength = iovArray.count() - offset;
281             // Should not use sendmsg_zc, just use normal writev.
282             IoUringIoOps ops = IoUringIoOps.newWritev(fd, (byte) 0, 0, iovArrayAddress, iovArrayLength, nextOpsId());
283 
284             byte opCode = ops.opcode();
285             writeId = registration.submit(ops);
286             writeOpCode = opCode;
287             if (writeId == 0) {
288                 return 0;
289             }
290             return 1;
291         }
292 
293         protected ChannelOutboundBuffer.MessageProcessor filterWriteMultiple(IovArray iovArray) {
294            return iovArray;
295         }
296 
297         @Override
298         protected int scheduleWriteSingle(Object msg) {
299             assert writeId == 0;
300 
301             int fd = fd().intValue();
302             IoRegistration registration = registration();
303             final IoUringIoOps ops;
304             if (msg instanceof IoUringFileRegion) {
305                 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
306                 try {
307                     fileRegion.open();
308                 } catch (IOException e) {
309                     this.handleWriteError(e);
310                     return 0;
311                 }
312                 ops = fileRegion.splice(fd);
313             } else if (msg instanceof FileRegion) {
314                 return scheduleWriteFileRegion(fd, registration, (FileRegion) msg);
315             } else {
316                 ByteBuf buf = (ByteBuf) msg;
317                 long address = IoUring.memoryAddress(buf) + buf.readerIndex();
318                 int length = buf.readableBytes();
319                 short opsid = nextOpsId();
320 
321                 ops = IoUringIoOps.newSend(fd, (byte) 0, 0, address, length, opsid);
322             }
323             byte opCode = ops.opcode();
324             writeId = registration.submit(ops);
325             writeOpCode = opCode;
326             if (writeId == 0) {
327                 return 0;
328             }
329             return 1;
330         }
331 
332         // Read a chunk from a generic FileRegion into a direct ByteBuf and submit it as an
333         // io_uring async send. If fileRegionChunkBuf is non-null, re-submits the remaining
334         // bytes from a previous partial/failed send.
335         private int scheduleWriteFileRegion(int fd, IoRegistration registration, FileRegion region) {
336             ByteBuf buf = fileRegionChunkBuf;
337             if (buf == null) {
338                 long remaining = region.count() - region.transferred();
339                 if (remaining > 0) {
340                     int chunkSize = (int) Math.min(remaining, FILE_REGION_MAX_CHUNK_SIZE);
341                     buf = alloc().directBuffer(chunkSize);
342                     try {
343                         ByteBufWritableByteChannel ch = new ByteBufWritableByteChannel(buf);
344                         // Mirror epoll's writeFileRegion(): stop calling transferTo() once
345                         // the region reports it has been fully transferred. The FileRegion
346                         // contract permits implementations to assume no further invocations
347                         // past transferred() == count().
348                         while (buf.writableBytes() > 0 && region.transferred() < region.count()) {
349                             long t = region.transferTo(ch, region.transferred());
350                             if (t <= 0) {
351                                 break;
352                             }
353                         }
354                         if (buf.readableBytes() == 0) {
355                             buf.release();
356                             handleWriteError(new ChannelException(
357                                     "FileRegion.transferTo(...) produced 0 bytes (count="
358                                             + region.count() + ", transferred=" + region.transferred() + ')'));
359                             return 0;
360                         }
361                     } catch (Exception e) {
362                         buf.release();
363                         handleWriteError(e);
364                         return 0;
365                     }
366                 } else {
367                     // Empty or fully-transferred region. Submit a 0-byte send so the completion
368                     // path removes it from the outbound buffer via the normal async flow.
369                     buf = alloc().directBuffer(0);
370                 }
371                 fileRegionChunkBuf = buf;
372             }
373             long address = IoUring.memoryAddress(buf) + buf.readerIndex();
374             int length = buf.readableBytes();
375             IoUringIoOps ops = IoUringIoOps.newSend(fd, (byte) 0, 0, address, length, nextOpsId());
376             byte opCode = ops.opcode();
377             writeId = registration.submit(ops);
378             writeOpCode = opCode;
379             if (writeId == 0) {
380                 // Submission only fails when the registration is no longer valid (channel is
381                 // being deregistered). unregistered() will release fileRegionChunkBuf and the
382                 // outbound buffer will release the FileRegion, so nothing to clean up here --
383                 // mirroring the plain ByteBuf path above.
384                 return 0;
385             }
386             return 1;
387         }
388 
389         private int calculateRecvFlags(boolean first) {
390             // Depending on if this is the first read or not we will use Native.MSG_DONTWAIT.
391             // The idea is that if the socket is blocking we can do the first read in a blocking fashion
392             // and so not need to also register POLLIN. As we can not 100 % sure if reads after the first will
393             // be possible directly we schedule these with Native.MSG_DONTWAIT. This allows us to still be
394             // able to signal the fireChannelReadComplete() in a timely manner and be consistent with other
395             // transports.
396             if (first) {
397                 return 0;
398             }
399             return Native.MSG_DONTWAIT;
400         }
401 
402         private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
403             // Depending on if socketIsEmpty is true we will arm the poll upfront and skip the initial transfer
404             // attempt.
405             // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#socket-state
406             if (first) {
407                 // IORING_RECVSEND_POLL_FIRST and IORING_CQE_F_SOCK_NONEMPTY were added in the same release (5.19).
408                 // We need to check if it's supported as otherwise providing these would result in an -EINVAL.
409                 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
410                         Native.IORING_RECVSEND_POLL_FIRST : 0;
411             }
412             return 0;
413         }
414 
415         @Override
416         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
417             assert readBuffer == null;
418             assert readId == 0 : readId;
419             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
420 
421             if (bufferRing != null && bufferRing.isUsable()) {
422                 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
423             }
424 
425             // We either have no buffer ring configured or we force a recv without using a buffer ring.
426             ByteBuf byteBuf = allocHandle.allocate(alloc());
427             try {
428                 int fd = fd().intValue();
429                 IoRegistration registration = registration();
430                 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
431                 int recvFlags = calculateRecvFlags(first);
432 
433                 IoUringIoOps ops = IoUringIoOps.newRecv(fd, (byte) 0, ioPrio, recvFlags,
434                         IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
435                 readId = registration.submit(ops);
436                 readOpCode = Native.IORING_OP_RECV;
437                 if (readId == 0) {
438                     return 0;
439                 }
440                 readBuffer = byteBuf;
441                 byteBuf = null;
442                 return 1;
443             } finally {
444                 if (byteBuf != null) {
445                     byteBuf.release();
446                 }
447             }
448         }
449 
450         private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
451             short bgId = bufferRing.bufferGroupId();
452             try {
453                 boolean multishot = IoUring.isRecvMultishotEnabled();
454                 byte flags = (byte) Native.IOSQE_BUFFER_SELECT;
455                 short ioPrio;
456                 final int recvFlags;
457                 if (multishot) {
458                     ioPrio = Native.IORING_RECV_MULTISHOT;
459                     recvFlags = 0;
460                 } else {
461                     // We should only use the calculate*() methods if this is not a multishot recv, as otherwise
462                     // the would be applied until the multishot will be re-armed.
463                     ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
464                     recvFlags = calculateRecvFlags(first);
465                 }
466                 if (IoUring.isRecvsendBundleEnabled()) {
467                     // See https://github.com/axboe/liburing/wiki/
468                     // What's-new-with-io_uring-in-6.10#add-support-for-sendrecv-bundles
469                     ioPrio |= Native.IORING_RECVSEND_BUNDLE;
470                 }
471                 IoRegistration registration = registration();
472                 int fd = fd().intValue();
473                 IoUringIoOps ops = IoUringIoOps.newRecv(
474                         fd, flags, ioPrio, recvFlags, 0,
475                         0, nextOpsId(), bgId
476                 );
477                 readId = registration.submit(ops);
478                 readOpCode = Native.IORING_OP_RECV;
479                 if (readId == 0) {
480                     return 0;
481                 }
482                 if (multishot) {
483                     // Return -1 to signal we used multishot and so expect multiple recvComplete(...) calls.
484                     return -1;
485                 }
486                 return 1;
487             } catch (IllegalArgumentException illegalArgumentException) {
488                 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
489                 return 0;
490             }
491         }
492 
493         @Override
494         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
495             ByteBuf byteBuf = readBuffer;
496             readBuffer = null;
497             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
498                 readId = 0;
499                 // In case of cancellation we should reset the last used buffer ring to null as we will select a new one
500                 // when calling scheduleRead(..)
501                 if (byteBuf != null) {
502                     //recv without buffer ring
503                     byteBuf.release();
504                 }
505                 return;
506             }
507             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
508             boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
509             short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
510             boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
511 
512             boolean empty = socketIsEmpty(flags);
513             if (rearm) {
514                 // Only reset if we don't use multi-shot or we need to re-arm because the multi-shot was cancelled.
515                 readId = 0;
516             }
517 
518             boolean allDataRead = false;
519 
520             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
521             final ChannelPipeline pipeline = pipeline();
522 
523             try {
524                 if (res < 0) {
525                     if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
526                         // try to expand the buffer ring by adding more buffers to it if there is any space left.
527                         if (!bufferRing.expand()) {
528                             // We couldn't expand the ring anymore so notify the user that we did run out of buffers
529                             // without the ability to expand it.
530                             // If this happens to often the user should most likely increase the buffer ring size.
531                             pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
532                         }
533 
534                         // Let's trigger a read again without consulting the RecvByteBufAllocator.Handle as
535                         // we can't count this as a "real" read operation.
536                         // Because of how our BufferRing works we should have it filled again.
537                         scheduleRead(allocHandle.isFirstRead());
538                         return;
539                     }
540 
541                     // If res is negative we should pass it to ioResult(...) which will either throw
542                     // or convert it to 0 if we could not read because the socket was not readable.
543                     allocHandle.lastBytesRead(ioResult("io_uring read", res));
544                 } else if (res > 0) {
545                     if (useBufferRing) {
546                         // If RECVSEND_BUNDLE is used we need to do a bit more work here.
547                         // In this case we might need to obtain multiple buffers out of the buffer ring as
548                         // multiple of them might have been filled for one recv operation.
549                         // See https://github.com/axboe/liburing/wiki/
550                         // What's-new-with-io_uring-in-6.10#add-support-for-sendrecv-bundles
551                         int read = res;
552                         for (;;) {
553                             int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
554                             byteBuf = bufferRing.useBuffer(bid, read, more);
555                             read -= byteBuf.readableBytes();
556                             allocHandle.attemptedBytesRead(attemptedBytesRead);
557                             allocHandle.lastBytesRead(byteBuf.readableBytes());
558 
559                             assert read >= 0;
560                             if (read == 0) {
561                                 // Just break here, we will handle the byteBuf below and also fill the bufferRing
562                                 // if needed later.
563                                 break;
564                             }
565                             allocHandle.incMessagesRead(1);
566                             pipeline.fireChannelRead(byteBuf);
567                             byteBuf = null;
568                             bid = bufferRing.nextBid(bid);
569                             if (!allocHandle.continueReading()) {
570                                 // We should call fireChannelReadComplete() to mimic a normal read loop.
571                                 allocHandle.readComplete();
572                                 pipeline.fireChannelReadComplete();
573                                 allocHandle.reset(config());
574                             }
575                         }
576                     } else {
577                         int attemptedBytesRead = byteBuf.writableBytes();
578                         byteBuf.writerIndex(byteBuf.writerIndex() + res);
579                         allocHandle.attemptedBytesRead(attemptedBytesRead);
580                         allocHandle.lastBytesRead(res);
581                     }
582                 } else {
583                     // EOF which we signal with -1.
584                     allocHandle.lastBytesRead(-1);
585                 }
586                 if (allocHandle.lastBytesRead() <= 0) {
587                     // byteBuf might be null if we used a buffer ring.
588                     if (byteBuf != null) {
589                         // nothing was read, release the buffer.
590                         byteBuf.release();
591                         byteBuf = null;
592                     }
593                     allDataRead = allocHandle.lastBytesRead() < 0;
594                     if (allDataRead) {
595                         // There is nothing left to read as we received an EOF.
596                         shutdownInput(true);
597                     }
598                     allocHandle.readComplete();
599                     pipeline.fireChannelReadComplete();
600                     return;
601                 }
602 
603                 allocHandle.incMessagesRead(1);
604                 pipeline.fireChannelRead(byteBuf);
605                 byteBuf = null;
606                 scheduleNextRead(pipeline, allocHandle, rearm, empty);
607             } catch (Throwable t) {
608                 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
609             }
610         }
611 
612         private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
613                                       boolean rearm, boolean empty) {
614             if (allocHandle.continueReading() && !empty) {
615                 if (rearm) {
616                     // We only should schedule another read if we need to rearm.
617                     // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#multi-shot
618                     scheduleRead(false);
619                 }
620             } else {
621                 // We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
622                 allocHandle.readComplete();
623                 pipeline.fireChannelReadComplete();
624             }
625         }
626 
627         protected final void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
628                                          Throwable cause, boolean allDataRead,
629                                          IoUringRecvByteAllocatorHandle allocHandle) {
630             if (byteBuf != null) {
631                 if (byteBuf.isReadable()) {
632                     pipeline.fireChannelRead(byteBuf);
633                 } else {
634                     byteBuf.release();
635                 }
636             }
637             allocHandle.readComplete();
638             pipeline.fireChannelReadComplete();
639             pipeline.fireExceptionCaught(cause);
640             if (allDataRead || cause instanceof IOException) {
641                 shutdownInput(true);
642             }
643         }
644 
645         private boolean handleWriteCompleteFileRegion(ChannelOutboundBuffer channelOutboundBuffer,
646                                                       IoUringFileRegion fileRegion, int res, short data) {
647             try {
648                 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
649                     return true;
650                 }
651                 int result = res >= 0 ? res : ioResult("io_uring splice", res);
652                 if (result == 0 && fileRegion.count() > 0) {
653                     validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
654                     return false;
655                 }
656                 int progress = fileRegion.handleResult(result, data);
657                 if (progress == -1) {
658                     // Done with writing
659                     channelOutboundBuffer.remove();
660                 } else if (progress > 0) {
661                     channelOutboundBuffer.progress(progress);
662                 }
663             } catch (Throwable cause) {
664                 handleWriteError(cause);
665             }
666             return true;
667         }
668 
669         @Override
670         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
671             if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
672                 // We only want to reset these if IORING_CQE_F_NOTIF is not set.
673                 // If it's set we know this is only an extra notification for a write but we already handled
674                 // the write completions before.
675                 // See https://man7.org/linux/man-pages/man2/io_uring_enter.2.html section: IORING_OP_SEND_ZC
676                 writeId = 0;
677                 writeOpCode = 0;
678             }
679             ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
680             Object current = channelOutboundBuffer.current();
681             if (current instanceof IoUringFileRegion) {
682                 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
683                 return handleWriteCompleteFileRegion(channelOutboundBuffer, fileRegion, res, data);
684             }
685 
686             if (current instanceof FileRegion) {
687                 return handleWriteCompleteGenericFileRegion(
688                         channelOutboundBuffer, (FileRegion) current, res);
689             }
690 
691             if (res >= 0) {
692                 channelOutboundBuffer.removeBytes(res);
693             } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
694                 return true;
695             } else {
696                 try {
697                     if (ioResult("io_uring write", res) == 0) {
698                         return false;
699                     }
700                 } catch (Throwable cause) {
701                     handleWriteError(cause);
702                 }
703             }
704             return true;
705         }
706 
707         // Returns true when the completion can be treated as "written all" for this SQE
708         // (the framework may still schedule further writes from the outbound buffer); returns
709         // false to signal the framework that POLLOUT should be armed so the chunk buffer is
710         // resubmitted once the socket becomes writable again.
711         private boolean handleWriteCompleteGenericFileRegion(
712                 ChannelOutboundBuffer channelOutboundBuffer, FileRegion region, int res) {
713             try {
714                 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
715                     releaseFileRegionChunkBuf();
716                     return true;
717                 }
718                 if (res >= 0) {
719                     ByteBuf buf = fileRegionChunkBuf;
720                     assert buf != null;
721                     buf.skipBytes(res);
722                     channelOutboundBuffer.progress(res);
723                     if (!buf.isReadable()) {
724                         // Chunk fully sent.
725                         releaseFileRegionChunkBuf();
726                         if (region.transferred() >= region.count()) {
727                             channelOutboundBuffer.remove();
728                         }
729                     } else {
730                         // Partial send -- schedule POLLOUT to re-send the remainder.
731                         return false;
732                     }
733                 } else {
734                     // Keep the chunk buffer -- on retryable errors (EAGAIN) ioResult returns 0
735                     // and scheduleWriteFileRegion() will re-submit the same fileRegionChunkBuf
736                     // once POLLOUT fires. On a non-retryable error ioResult throws, and the
737                     // outer catch releases the buffer.
738                     if (ioResult("io_uring write", res) == 0) {
739                         return false;
740                     }
741                 }
742             } catch (Throwable cause) {
743                 releaseFileRegionChunkBuf();
744                 handleWriteError(cause);
745             }
746             return true;
747         }
748 
749         private void releaseFileRegionChunkBuf() {
750             if (fileRegionChunkBuf != null) {
751                 fileRegionChunkBuf.release();
752                 fileRegionChunkBuf = null;
753             }
754         }
755 
756         @Override
757         public void unregistered() {
758             super.unregistered();
759             assert readBuffer == null;
760             releaseFileRegionChunkBuf();
761         }
762     }
763 
764     @Override
765     protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
766         if (readId != 0) {
767             // Let's try to cancel outstanding reads as these might be submitted and waiting for data (via fastpoll).
768             assert numOutstandingReads == 1 || numOutstandingReads == -1;
769             IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, readId, readOpCode);
770             long id = registration.submit(ops);
771             assert id != 0;
772             readId = 0;
773         }
774     }
775 
776     @Override
777     protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
778         if (writeId != 0) {
779             // Let's try to cancel outstanding writes as these might be submitted and waiting to finish writing
780             // (via fastpoll).
781             assert numOutstandingWrites == 1;
782             assert writeOpCode != 0;
783             long id = registration.submit(IoUringIoOps.newAsyncCancel((byte) 0, writeId, writeOpCode));
784             assert id != 0;
785             writeId = 0;
786         }
787     }
788 
789     @Override
790     protected boolean socketIsEmpty(int flags) {
791         return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
792     }
793 
794     @Override
795     boolean isPollInFirst() {
796         return bufferRing == null || !bufferRing.isUsable();
797     }
798 
799     /**
800      * A {@link WritableByteChannel} backed by a {@link ByteBuf}.
801      * Writes are capped to {@link ByteBuf#writableBytes()} to prevent overflow
802      * when {@link FileRegion#transferTo} writes more than the chunk size.
803      */
804     private static final class ByteBufWritableByteChannel implements WritableByteChannel {
805         private final ByteBuf buf;
806 
807         ByteBufWritableByteChannel(ByteBuf buf) {
808             this.buf = buf;
809         }
810 
811         @Override
812         public int write(ByteBuffer src) {
813             int toWrite = Math.min(src.remaining(), buf.writableBytes());
814             if (toWrite == 0) {
815                 return 0;
816             }
817             if (toWrite < src.remaining()) {
818                 int oldLimit = src.limit();
819                 src.limit(src.position() + toWrite);
820                 buf.writeBytes(src);
821                 src.limit(oldLimit);
822                 return toWrite;
823             }
824             buf.writeBytes(src);
825             return toWrite;
826         }
827 
828         @Override
829         public boolean isOpen() {
830             return true;
831         }
832 
833         @Override
834         public void close() {
835             // NOOP
836         }
837     }
838 }