View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultFileRegion;
27  import io.netty.channel.EventLoop;
28  import io.netty.channel.IoRegistration;
29  import io.netty.channel.socket.DuplexChannel;
30  import io.netty.channel.unix.IovArray;
31  import io.netty.channel.unix.Limits;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.io.IOException;
36  import java.net.SocketAddress;
37  
38  import static io.netty.channel.unix.Errors.ioResult;
39  
40  abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
42      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
43  
44      // Store the opCode so we know if we used WRITE or WRITEV.
45      private byte writeOpCode;
46      // Keep track of the ids used for write and read so we can cancel these when needed.
47      private long writeId;
48      private long readId;
49  
50      // The configured buffer ring if any
51      private IoUringBufferRing bufferRing;
52  
53      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
54          super(parent, socket, active);
55      }
56  
57      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
58          super(parent, socket, remote);
59      }
60  
61      @Override
62      public ChannelMetadata metadata() {
63          return METADATA;
64      }
65  
66      @Override
67      protected final AbstractUringUnsafe newUnsafe() {
68          return new IoUringStreamUnsafe();
69      }
70  
71      @Override
72      public final ChannelFuture shutdown() {
73          return shutdown(newPromise());
74      }
75  
76      @Override
77      public final ChannelFuture shutdown(final ChannelPromise promise) {
78          ChannelFuture shutdownOutputFuture = shutdownOutput();
79          if (shutdownOutputFuture.isDone()) {
80              shutdownOutputDone(shutdownOutputFuture, promise);
81          } else {
82              shutdownOutputFuture.addListener(new ChannelFutureListener() {
83                  @Override
84                  public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
85                      shutdownOutputDone(shutdownOutputFuture, promise);
86                  }
87              });
88          }
89          return promise;
90      }
91  
92      @Override
93      protected final void doShutdownOutput() throws Exception {
94          socket.shutdown(false, true);
95      }
96  
97      private void shutdownInput0(final ChannelPromise promise) {
98          try {
99              socket.shutdown(true, false);
100             promise.setSuccess();
101         } catch (Throwable cause) {
102             promise.setFailure(cause);
103         }
104     }
105 
106     @Override
107     public final boolean isOutputShutdown() {
108         return socket.isOutputShutdown();
109     }
110 
111     @Override
112     public final boolean isInputShutdown() {
113         return socket.isInputShutdown();
114     }
115 
116     @Override
117     public final boolean isShutdown() {
118         return socket.isShutdown();
119     }
120 
121     @Override
122     public final ChannelFuture shutdownOutput() {
123         return shutdownOutput(newPromise());
124     }
125 
126     @Override
127     public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
128         EventLoop loop = eventLoop();
129         if (loop.inEventLoop()) {
130             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
131         } else {
132             loop.execute(new Runnable() {
133                 @Override
134                 public void run() {
135                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
136                 }
137             });
138         }
139 
140         return promise;
141     }
142 
143     @Override
144     public final ChannelFuture shutdownInput() {
145         return shutdownInput(newPromise());
146     }
147 
148     @Override
149     public final ChannelFuture shutdownInput(final ChannelPromise promise) {
150         EventLoop loop = eventLoop();
151         if (loop.inEventLoop()) {
152             shutdownInput0(promise);
153         } else {
154             loop.execute(new Runnable() {
155                 @Override
156                 public void run() {
157                     shutdownInput0(promise);
158                 }
159             });
160         }
161         return promise;
162     }
163 
164     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
165         ChannelFuture shutdownInputFuture = shutdownInput();
166         if (shutdownInputFuture.isDone()) {
167             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
168         } else {
169             shutdownInputFuture.addListener(new ChannelFutureListener() {
170                 @Override
171                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
172                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
173                 }
174             });
175         }
176     }
177 
178     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
179                                      ChannelFuture shutdownInputFuture,
180                                      ChannelPromise promise) {
181         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
182         Throwable shutdownInputCause = shutdownInputFuture.cause();
183         if (shutdownOutputCause != null) {
184             if (shutdownInputCause != null) {
185                 logger.info("Exception suppressed because a previous exception occurred.",
186                              shutdownInputCause);
187             }
188             promise.setFailure(shutdownOutputCause);
189         } else if (shutdownInputCause != null) {
190             promise.setFailure(shutdownInputCause);
191         } else {
192             promise.setSuccess();
193         }
194     }
195 
196     @Override
197     protected final void doRegister(ChannelPromise promise) {
198         super.doRegister(promise);
199         promise.addListener(f -> {
200             if (f.isSuccess()) {
201                 short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
202                 if (bgid >= 0) {
203                     final IoUringIoHandler ioUringIoHandler = registration().attachment();
204                     bufferRing = ioUringIoHandler.findBufferRing(bgid);
205                 }
206                 if (active) {
207                     // Register for POLLRDHUP if this channel is already considered active.
208                     schedulePollRdHup();
209                 }
210             }
211         });
212     }
213 
214     @Override
215     protected Object filterOutboundMessage(Object msg) {
216         // Since we cannot use synchronous sendfile,
217         // the channel can only support DefaultFileRegion instead of FileRegion.
218         if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
219             return new IoUringFileRegion((DefaultFileRegion) msg);
220         }
221 
222         return super.filterOutboundMessage(msg);
223     }
224 
225     private final class IoUringStreamUnsafe extends AbstractUringUnsafe {
226 
227         private ByteBuf readBuffer;
228         private IovArray iovArray;
229 
230         @Override
231         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
232             assert iovArray == null;
233             assert writeId == 0;
234             int numElements = Math.min(in.size(), Limits.IOV_MAX);
235             ByteBuf iovArrayBuffer = alloc().directBuffer(numElements * IovArray.IOV_SIZE);
236             iovArray = new IovArray(iovArrayBuffer);
237             try {
238                 int offset = iovArray.count();
239                 in.forEachFlushedMessage(iovArray);
240 
241                 int fd = fd().intValue();
242                 IoRegistration registration = registration();
243                 IoUringIoOps ops = IoUringIoOps.newWritev(fd, flags((byte) 0), 0, iovArray.memoryAddress(offset),
244                         iovArray.count() - offset, nextOpsId());
245                 byte opCode = ops.opcode();
246                 writeId = registration.submit(ops);
247                 writeOpCode = opCode;
248                 if (writeId == 0) {
249                     iovArray.release();
250                     iovArray = null;
251                     return 0;
252                 }
253             } catch (Exception e) {
254                 iovArray.release();
255                 iovArray = null;
256 
257                 // This should never happen, anyway fallback to single write.
258                 scheduleWriteSingle(in.current());
259             }
260             return 1;
261         }
262 
263         @Override
264         protected int scheduleWriteSingle(Object msg) {
265             assert iovArray == null;
266             assert writeId == 0;
267 
268             int fd = fd().intValue();
269             IoRegistration registration = registration();
270             final IoUringIoOps ops;
271             if (msg instanceof IoUringFileRegion) {
272                 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
273                 try {
274                     fileRegion.open();
275                 } catch (IOException e) {
276                     this.handleWriteError(e);
277                     return 0;
278                 }
279                 ops = fileRegion.splice(fd);
280             } else {
281                 ByteBuf buf = (ByteBuf) msg;
282                 ops = IoUringIoOps.newWrite(fd, flags((byte) 0), 0,
283                         buf.memoryAddress() + buf.readerIndex(), buf.readableBytes(), nextOpsId());
284             }
285 
286             byte opCode = ops.opcode();
287             writeId = registration.submit(ops);
288             writeOpCode = opCode;
289             if (writeId == 0) {
290                 return 0;
291             }
292             return 1;
293         }
294 
295         private int calculateRecvFlags(boolean first) {
296             // Depending on if this is the first read or not we will use Native.MSG_DONTWAIT.
297             // The idea is that if the socket is blocking we can do the first read in a blocking fashion
298             // and so not need to also register POLLIN. As we can not 100 % sure if reads after the first will
299             // be possible directly we schedule these with Native.MSG_DONTWAIT. This allows us to still be
300             // able to signal the fireChannelReadComplete() in a timely manner and be consistent with other
301             // transports.
302             if (first) {
303                 return 0;
304             }
305             return Native.MSG_DONTWAIT;
306         }
307 
308         private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
309             // Depending on if socketIsEmpty is true we will arm the poll upfront and skip the initial transfer
310             // attempt.
311             // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#socket-state
312             if (first) {
313                 // IORING_RECVSEND_POLL_FIRST and IORING_CQE_F_SOCK_NONEMPTY were added in the same release (5.19).
314                 // We need to check if it's supported as otherwise providing these would result in an -EINVAL.
315                 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
316                         Native.IORING_RECVSEND_POLL_FIRST : 0;
317             }
318             return 0;
319         }
320 
321         @Override
322         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
323             assert readBuffer == null;
324             assert readId == 0 : readId;
325             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
326 
327             if (bufferRing != null && bufferRing.isUsable()) {
328                 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
329             }
330 
331             // We either have no buffer ring configured or we force a recv without using a buffer ring.
332             ByteBuf byteBuf = allocHandle.allocate(alloc());
333             try {
334                 int fd = fd().intValue();
335                 IoRegistration registration = registration();
336                 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
337                 int recvFlags = calculateRecvFlags(first);
338 
339                 IoUringIoOps ops = IoUringIoOps.newRecv(fd, flags((byte) 0), ioPrio, recvFlags,
340                         byteBuf.memoryAddress() + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
341                 readId = registration.submit(ops);
342                 if (readId == 0) {
343                     return 0;
344                 }
345                 readBuffer = byteBuf;
346                 byteBuf = null;
347                 return 1;
348             } finally {
349                 if (byteBuf != null) {
350                     byteBuf.release();
351                 }
352             }
353         }
354 
355         private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
356             short bgId = bufferRing.bufferGroupId();
357             try {
358                 boolean multishot = IoUring.isRecvMultishotSupported();
359                 byte flags = flags((byte) Native.IOSQE_BUFFER_SELECT);
360                 short ioPrio;
361                 final int recvFlags;
362                 if (multishot) {
363                     ioPrio = Native.IORING_RECV_MULTISHOT;
364                     recvFlags = 0;
365                 } else {
366                     // We should only use the calculate*() methods if this is not a multishot recv, as otherwise
367                     // the would be applied until the multishot will be re-armed.
368                     ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
369                     recvFlags = calculateRecvFlags(first);
370                 }
371                 if (IoUring.isRecvsendBundleSupported()) {
372                     // See https://github.com/axboe/liburing/wiki/
373                     // What's-new-with-io_uring-in-6.10#add-support-for-sendrecv-bundles
374                     ioPrio |= Native.IORING_RECVSEND_BUNDLE;
375                 }
376                 IoRegistration registration = registration();
377                 int fd = fd().intValue();
378                 IoUringIoOps ops = IoUringIoOps.newRecv(
379                         fd, flags, ioPrio, recvFlags, 0,
380                         0, nextOpsId(), bgId
381                 );
382                 readId = registration.submit(ops);
383                 if (readId == 0) {
384                     return 0;
385                 }
386                 if (multishot) {
387                     // Return -1 to signal we used multishot and so expect multiple recvComplete(...) calls.
388                     return -1;
389                 }
390                 return 1;
391             } catch (IllegalArgumentException illegalArgumentException) {
392                 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
393                 return 0;
394             }
395         }
396 
397         @Override
398         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
399             ByteBuf byteBuf = readBuffer;
400             readBuffer = null;
401             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
402                 readId = 0;
403                 // In case of cancellation we should reset the last used buffer ring to null as we will select a new one
404                 // when calling scheduleRead(..)
405                 if (byteBuf != null) {
406                     //recv without buffer ring
407                     byteBuf.release();
408                 }
409                 return;
410             }
411             assert readId != 0;
412             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
413             boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
414             short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
415             boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
416 
417             boolean empty = socketIsEmpty(flags);
418             if (rearm) {
419                 // Only reset if we don't use multi-shot or we need to re-arm because the multi-shot was cancelled.
420                 readId = 0;
421             }
422 
423             boolean allDataRead = false;
424 
425             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
426             final ChannelPipeline pipeline = pipeline();
427 
428             try {
429                 if (res < 0) {
430                     if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
431                         // recv with provider buffer failed, Fire the BufferRingExhaustedEvent to notify users.
432                         // About the failure. If this happens to often the user should most likely increase the
433                         // buffer ring size.
434                         pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
435 
436                         // Let's trigger a read again without consulting the RecvByteBufAllocator.Handle as
437                         // we can't count this as a "real" read operation.
438                         // Because of how our BufferRing works we should have it filled again.
439                         scheduleRead(allocHandle.isFirstRead());
440                         return;
441                     }
442 
443                     // If res is negative we should pass it to ioResult(...) which will either throw
444                     // or convert it to 0 if we could not read because the socket was not readable.
445                     allocHandle.lastBytesRead(ioResult("io_uring read", res));
446                 } else if (res > 0) {
447                     if (useBufferRing) {
448 
449                         if (IoUring.isRecvsendBundleSupported()) {
450                             // If RECVSEND_BUNDLE is supported we need to do a bit more work here.
451                             // In this case we might need to obtain multiple buffers out of the buffer ring as
452                             // multiple of them might have been filled for one recv operation.
453                             // See https://github.com/axboe/liburing/wiki/
454                             // What's-new-with-io_uring-in-6.10#add-support-for-sendrecv-bundles
455                             int read = res;
456                             for (;;) {
457                                 int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
458                                 byteBuf = bufferRing.useBuffer(bid, read, more);
459                                 read -= byteBuf.readableBytes();
460                                 allocHandle.attemptedBytesRead(attemptedBytesRead);
461                                 allocHandle.lastBytesRead(byteBuf.readableBytes());
462 
463                                 assert read >= 0;
464                                 if (read == 0) {
465                                     // Just break here, we will handle the byteBuf below and also fill the bufferRing
466                                     // if needed later.
467                                     break;
468                                 }
469                                 allocHandle.incMessagesRead(1);
470                                 pipeline.fireChannelRead(byteBuf);
471                                 byteBuf = null;
472 
473                                 // Fill a new buffer for the bid after we fired the buffer through the pipeline.
474                                 // We do it only after we called fireChannelRead(...) as there is a good chance
475                                 // that the user will have released the buffer. In this case we reduce memory usage.
476                                 bufferRing.fillBuffer(bid);
477                                 bid = bufferRing.nextBid(bid);
478                                 if (!allocHandle.continueReading()) {
479                                     // We should call fireChannelReadComplete() to mimic a normal read loop.
480                                     allocHandle.readComplete();
481                                     pipeline.fireChannelReadComplete();
482                                     allocHandle.reset(config());
483                                 }
484                             }
485                         } else {
486                             int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
487                             byteBuf = bufferRing.useBuffer(bid, res, more);
488                             allocHandle.attemptedBytesRead(attemptedBytesRead);
489                             allocHandle.lastBytesRead(res);
490                         }
491                     } else {
492                         int attemptedBytesRead = byteBuf.writableBytes();
493                         byteBuf.writerIndex(byteBuf.writerIndex() + res);
494                         allocHandle.attemptedBytesRead(attemptedBytesRead);
495                         allocHandle.lastBytesRead(res);
496                     }
497                 } else {
498                     // EOF which we signal with -1.
499                     allocHandle.lastBytesRead(-1);
500                 }
501                 if (allocHandle.lastBytesRead() <= 0) {
502                     // byteBuf might be null if we used a buffer ring.
503                     if (byteBuf != null) {
504                         // nothing was read, release the buffer.
505                         byteBuf.release();
506                         byteBuf = null;
507                     }
508                     allDataRead = allocHandle.lastBytesRead() < 0;
509                     if (allDataRead) {
510                         // There is nothing left to read as we received an EOF.
511                         shutdownInput(true);
512                     }
513                     allocHandle.readComplete();
514                     pipeline.fireChannelReadComplete();
515                     return;
516                 }
517 
518                 allocHandle.incMessagesRead(1);
519                 pipeline.fireChannelRead(byteBuf);
520                 byteBuf = null;
521                 if (useBufferRing && !more) {
522                     // Fill a new buffer for the bid after we fired the buffer through the pipeline.
523                     // We do it only after we called fireChannelRead(...) as there is a good chance
524                     // that the user will have released the buffer. In this case we reduce memory usage.
525                     bufferRing.fillBuffer(bid);
526                 }
527                 scheduleNextRead(pipeline, allocHandle, rearm, empty);
528             } catch (Throwable t) {
529                 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
530             }
531         }
532 
533         private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
534                                       boolean rearm, boolean empty) {
535             if (allocHandle.continueReading() && !empty) {
536                 if (rearm) {
537                     // We only should schedule another read if we need to rearm.
538                     // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#multi-shot
539                     scheduleRead(false);
540                 }
541             } else {
542                 // We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
543                 allocHandle.readComplete();
544                 pipeline.fireChannelReadComplete();
545             }
546         }
547 
548         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
549                                          Throwable cause, boolean allDataRead,
550                                          IoUringRecvByteAllocatorHandle allocHandle) {
551             if (byteBuf != null) {
552                 if (byteBuf.isReadable()) {
553                     pipeline.fireChannelRead(byteBuf);
554                 } else {
555                     byteBuf.release();
556                 }
557             }
558             allocHandle.readComplete();
559             pipeline.fireChannelReadComplete();
560             pipeline.fireExceptionCaught(cause);
561             if (allDataRead || cause instanceof IOException) {
562                 shutdownInput(true);
563             }
564         }
565 
566         @Override
567         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
568             assert writeId != 0;
569             writeId = 0;
570             writeOpCode = 0;
571 
572             ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
573             Object current = channelOutboundBuffer.current();
574             if (current instanceof IoUringFileRegion) {
575                 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
576                 try {
577                     int result = res >= 0 ? res : ioResult("io_uring splice", res);
578                     if (result == 0 && fileRegion.count() > 0) {
579                         validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
580                         return false;
581                     }
582                     int progress = fileRegion.handleResult(result, data);
583                     if (progress == -1) {
584                         // Done with writing
585                         channelOutboundBuffer.remove();
586                     } else if (progress > 0) {
587                         channelOutboundBuffer.progress(progress);
588                     }
589                 } catch (Throwable cause) {
590                     handleWriteError(cause);
591                 }
592                 return true;
593             }
594 
595             IovArray iovArray = this.iovArray;
596             if (iovArray != null) {
597                 this.iovArray = null;
598                 iovArray.release();
599             }
600             if (res >= 0) {
601                 channelOutboundBuffer.removeBytes(res);
602             } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
603                 return true;
604             } else {
605                 try {
606                     if (ioResult("io_uring write", res) == 0) {
607                         return false;
608                     }
609                 } catch (Throwable cause) {
610                     handleWriteError(cause);
611                 }
612             }
613             return true;
614         }
615 
616         @Override
617         protected void freeResourcesNow(IoRegistration reg) {
618             super.freeResourcesNow(reg);
619             assert readBuffer == null;
620         }
621     }
622 
623     @Override
624     protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
625         if (readId != 0) {
626             // Let's try to cancel outstanding reads as these might be submitted and waiting for data (via fastpoll).
627             assert numOutstandingReads == 1 || numOutstandingReads == -1;
628             IoUringIoOps ops = IoUringIoOps.newAsyncCancel(flags((byte) 0), readId, Native.IORING_OP_RECV);
629             long id = registration.submit(ops);
630             assert id != 0;
631         } else {
632             assert numOutstandingReads == 0 || numOutstandingReads == -1;
633         }
634     }
635 
636     @Override
637     protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
638         if (writeId != 0) {
639             // Let's try to cancel outstanding writes as these might be submitted and waiting to finish writing
640             // (via fastpoll).
641             assert numOutstandingWrites == 1;
642             assert writeOpCode != 0;
643             long id = registration.submit(IoUringIoOps.newAsyncCancel(flags((byte) 0), writeId, writeOpCode));
644             assert id != 0;
645         } else {
646             assert numOutstandingWrites == 0;
647         }
648     }
649 
650     @Override
651     protected boolean socketIsEmpty(int flags) {
652         return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
653     }
654 
655     @Override
656     protected void submitAndRunNow() {
657         if (writeId != 0) {
658             // Force a submit and processing of the completions to ensure we drain the outbound buffer and
659             // send the data to the remote peer.
660             ((IoUringIoHandler) registration().attachment()).submitAndRunNow(writeId);
661         }
662         super.submitAndRunNow();
663     }
664 
665     @Override
666     boolean isPollInFirst() {
667         return bufferRing == null || !bufferRing.isUsable();
668     }
669 }