View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultFileRegion;
27  import io.netty.channel.EventLoop;
28  import io.netty.channel.IoRegistration;
29  import io.netty.channel.socket.DuplexChannel;
30  import io.netty.channel.unix.IovArray;
31  import io.netty.channel.unix.Limits;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.io.IOException;
36  import java.net.SocketAddress;
37  
38  import static io.netty.channel.unix.Errors.ioResult;
39  
40  abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
42      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
43  
44      // Store the opCode so we know if we used WRITE or WRITEV.
45      private byte writeOpCode;
46      // Keep track of the ids used for write and read so we can cancel these when needed.
47      private long writeId;
48      private long readId;
49  
50      // The configured buffer ring if any
51      private IoUringBufferRing bufferRing;
52  
53      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
54          super(parent, socket, active);
55      }
56  
57      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
58          super(parent, socket, remote);
59      }
60  
61      @Override
62      public ChannelMetadata metadata() {
63          return METADATA;
64      }
65  
66      @Override
67      protected final AbstractUringUnsafe newUnsafe() {
68          return new IoUringStreamUnsafe();
69      }
70  
71      @Override
72      public final ChannelFuture shutdown() {
73          return shutdown(newPromise());
74      }
75  
76      @Override
77      public final ChannelFuture shutdown(final ChannelPromise promise) {
78          ChannelFuture shutdownOutputFuture = shutdownOutput();
79          if (shutdownOutputFuture.isDone()) {
80              shutdownOutputDone(shutdownOutputFuture, promise);
81          } else {
82              shutdownOutputFuture.addListener(new ChannelFutureListener() {
83                  @Override
84                  public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
85                      shutdownOutputDone(shutdownOutputFuture, promise);
86                  }
87              });
88          }
89          return promise;
90      }
91  
92      @Override
93      protected final void doShutdownOutput() throws Exception {
94          socket.shutdown(false, true);
95      }
96  
97      private void shutdownInput0(final ChannelPromise promise) {
98          try {
99              socket.shutdown(true, false);
100             promise.setSuccess();
101         } catch (Throwable cause) {
102             promise.setFailure(cause);
103         }
104     }
105 
106     @Override
107     public final boolean isOutputShutdown() {
108         return socket.isOutputShutdown();
109     }
110 
111     @Override
112     public final boolean isInputShutdown() {
113         return socket.isInputShutdown();
114     }
115 
116     @Override
117     public final boolean isShutdown() {
118         return socket.isShutdown();
119     }
120 
121     @Override
122     public final ChannelFuture shutdownOutput() {
123         return shutdownOutput(newPromise());
124     }
125 
126     @Override
127     public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
128         EventLoop loop = eventLoop();
129         if (loop.inEventLoop()) {
130             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
131         } else {
132             loop.execute(new Runnable() {
133                 @Override
134                 public void run() {
135                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
136                 }
137             });
138         }
139 
140         return promise;
141     }
142 
143     @Override
144     public final ChannelFuture shutdownInput() {
145         return shutdownInput(newPromise());
146     }
147 
148     @Override
149     public final ChannelFuture shutdownInput(final ChannelPromise promise) {
150         EventLoop loop = eventLoop();
151         if (loop.inEventLoop()) {
152             shutdownInput0(promise);
153         } else {
154             loop.execute(new Runnable() {
155                 @Override
156                 public void run() {
157                     shutdownInput0(promise);
158                 }
159             });
160         }
161         return promise;
162     }
163 
164     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
165         ChannelFuture shutdownInputFuture = shutdownInput();
166         if (shutdownInputFuture.isDone()) {
167             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
168         } else {
169             shutdownInputFuture.addListener(new ChannelFutureListener() {
170                 @Override
171                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
172                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
173                 }
174             });
175         }
176     }
177 
178     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
179                                      ChannelFuture shutdownInputFuture,
180                                      ChannelPromise promise) {
181         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
182         Throwable shutdownInputCause = shutdownInputFuture.cause();
183         if (shutdownOutputCause != null) {
184             if (shutdownInputCause != null) {
185                 logger.info("Exception suppressed because a previous exception occurred.",
186                              shutdownInputCause);
187             }
188             promise.setFailure(shutdownOutputCause);
189         } else if (shutdownInputCause != null) {
190             promise.setFailure(shutdownInputCause);
191         } else {
192             promise.setSuccess();
193         }
194     }
195 
196     @Override
197     protected final void doRegister(ChannelPromise promise) {
198         ChannelPromise registerPromise = this.newPromise();
199         // Ensure that the buffer group is properly set before channel::read
200         registerPromise.addListener(f -> {
201             if (f.isSuccess()) {
202                try {
203                    short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
204                    if (bgid >= 0) {
205                        final IoUringIoHandler ioUringIoHandler = registration().attachment();
206                        bufferRing = ioUringIoHandler.findBufferRing(bgid);
207                    }
208                    if (active) {
209                        // Register for POLLRDHUP if this channel is already considered active.
210                        schedulePollRdHup();
211                    }
212                } finally {
213                    promise.setSuccess();
214                }
215             } else {
216                 promise.setFailure(f.cause());
217             }
218         });
219 
220         super.doRegister(registerPromise);
221     }
222 
223     @Override
224     protected Object filterOutboundMessage(Object msg) {
225         // Since we cannot use synchronous sendfile,
226         // the channel can only support DefaultFileRegion instead of FileRegion.
227         if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
228             return new IoUringFileRegion((DefaultFileRegion) msg);
229         }
230 
231         return super.filterOutboundMessage(msg);
232     }
233 
234     private final class IoUringStreamUnsafe extends AbstractUringUnsafe {
235 
236         private ByteBuf readBuffer;
237 
238         @Override
239         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
240             assert writeId == 0;
241             int numElements = Math.min(in.size(), Limits.IOV_MAX);
242             IoUringIoHandler handler = registration().attachment();
243             IovArray iovArray = handler.iovArray();
244             try {
245                 int offset = iovArray.count();
246                 in.forEachFlushedMessage(iovArray);
247 
248                 int fd = fd().intValue();
249                 IoRegistration registration = registration();
250                 IoUringIoOps ops = IoUringIoOps.newWritev(fd, (byte) 0, 0, iovArray.memoryAddress(offset),
251                         iovArray.count() - offset, nextOpsId());
252                 byte opCode = ops.opcode();
253                 writeId = registration.submit(ops);
254                 writeOpCode = opCode;
255                 if (writeId == 0) {
256                     return 0;
257                 }
258             } catch (Exception e) {
259                 // This should never happen, anyway fallback to single write.
260                 scheduleWriteSingle(in.current());
261             }
262             return 1;
263         }
264 
265         @Override
266         protected int scheduleWriteSingle(Object msg) {
267             assert writeId == 0;
268 
269             int fd = fd().intValue();
270             IoRegistration registration = registration();
271             final IoUringIoOps ops;
272             if (msg instanceof IoUringFileRegion) {
273                 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
274                 try {
275                     fileRegion.open();
276                 } catch (IOException e) {
277                     this.handleWriteError(e);
278                     return 0;
279                 }
280                 ops = fileRegion.splice(fd);
281             } else {
282                 ByteBuf buf = (ByteBuf) msg;
283                 ops = IoUringIoOps.newWrite(fd, (byte) 0, 0,
284                         IoUring.memoryAddress(buf) + buf.readerIndex(), buf.readableBytes(), nextOpsId());
285             }
286 
287             byte opCode = ops.opcode();
288             writeId = registration.submit(ops);
289             writeOpCode = opCode;
290             if (writeId == 0) {
291                 return 0;
292             }
293             return 1;
294         }
295 
296         private int calculateRecvFlags(boolean first) {
297             // Depending on if this is the first read or not we will use Native.MSG_DONTWAIT.
298             // The idea is that if the socket is blocking we can do the first read in a blocking fashion
299             // and so not need to also register POLLIN. As we can not 100 % sure if reads after the first will
300             // be possible directly we schedule these with Native.MSG_DONTWAIT. This allows us to still be
301             // able to signal the fireChannelReadComplete() in a timely manner and be consistent with other
302             // transports.
303             if (first) {
304                 return 0;
305             }
306             return Native.MSG_DONTWAIT;
307         }
308 
309         private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
310             // Depending on if socketIsEmpty is true we will arm the poll upfront and skip the initial transfer
311             // attempt.
312             // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#socket-state
313             if (first) {
314                 // IORING_RECVSEND_POLL_FIRST and IORING_CQE_F_SOCK_NONEMPTY were added in the same release (5.19).
315                 // We need to check if it's supported as otherwise providing these would result in an -EINVAL.
316                 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
317                         Native.IORING_RECVSEND_POLL_FIRST : 0;
318             }
319             return 0;
320         }
321 
322         @Override
323         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
324             assert readBuffer == null;
325             assert readId == 0 : readId;
326             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
327 
328             if (bufferRing != null && bufferRing.isUsable()) {
329                 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
330             }
331 
332             // We either have no buffer ring configured or we force a recv without using a buffer ring.
333             ByteBuf byteBuf = allocHandle.allocate(alloc());
334             try {
335                 int fd = fd().intValue();
336                 IoRegistration registration = registration();
337                 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
338                 int recvFlags = calculateRecvFlags(first);
339 
340                 IoUringIoOps ops = IoUringIoOps.newRecv(fd, (byte) 0, ioPrio, recvFlags,
341                         IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
342                 readId = registration.submit(ops);
343                 if (readId == 0) {
344                     return 0;
345                 }
346                 readBuffer = byteBuf;
347                 byteBuf = null;
348                 return 1;
349             } finally {
350                 if (byteBuf != null) {
351                     byteBuf.release();
352                 }
353             }
354         }
355 
356         private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
357             short bgId = bufferRing.bufferGroupId();
358             try {
359                 boolean multishot = IoUring.isRecvMultishotEnabled();
360                 byte flags = (byte) Native.IOSQE_BUFFER_SELECT;
361                 short ioPrio;
362                 final int recvFlags;
363                 if (multishot) {
364                     ioPrio = Native.IORING_RECV_MULTISHOT;
365                     recvFlags = 0;
366                 } else {
367                     // We should only use the calculate*() methods if this is not a multishot recv, as otherwise
368                     // the would be applied until the multishot will be re-armed.
369                     ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
370                     recvFlags = calculateRecvFlags(first);
371                 }
372                 if (IoUring.isRecvsendBundleEnabled()) {
373                     // See https://github.com/axboe/liburing/wiki/
374                     // What's-new-with-io_uring-in-6.10#add-support-for-sendrecv-bundles
375                     ioPrio |= Native.IORING_RECVSEND_BUNDLE;
376                 }
377                 IoRegistration registration = registration();
378                 int fd = fd().intValue();
379                 IoUringIoOps ops = IoUringIoOps.newRecv(
380                         fd, flags, ioPrio, recvFlags, 0,
381                         0, nextOpsId(), bgId
382                 );
383                 readId = registration.submit(ops);
384                 if (readId == 0) {
385                     return 0;
386                 }
387                 if (multishot) {
388                     // Return -1 to signal we used multishot and so expect multiple recvComplete(...) calls.
389                     return -1;
390                 }
391                 return 1;
392             } catch (IllegalArgumentException illegalArgumentException) {
393                 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
394                 return 0;
395             }
396         }
397 
398         @Override
399         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
400             ByteBuf byteBuf = readBuffer;
401             readBuffer = null;
402             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
403                 readId = 0;
404                 // In case of cancellation we should reset the last used buffer ring to null as we will select a new one
405                 // when calling scheduleRead(..)
406                 if (byteBuf != null) {
407                     //recv without buffer ring
408                     byteBuf.release();
409                 }
410                 return;
411             }
412             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
413             boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
414             short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
415             boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
416 
417             boolean empty = socketIsEmpty(flags);
418             if (rearm) {
419                 // Only reset if we don't use multi-shot or we need to re-arm because the multi-shot was cancelled.
420                 readId = 0;
421             }
422 
423             boolean allDataRead = false;
424 
425             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
426             final ChannelPipeline pipeline = pipeline();
427 
428             try {
429                 if (res < 0) {
430                     if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
431                         // recv with provider buffer failed, Fire the BufferRingExhaustedEvent to notify users.
432                         // About the failure. If this happens to often the user should most likely increase the
433                         // buffer ring size.
434                         pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
435 
436                         // try to expand the buffer ring by adding more buffers to it if there is any space left.
437                         bufferRing.expand();
438 
439                         // Let's trigger a read again without consulting the RecvByteBufAllocator.Handle as
440                         // we can't count this as a "real" read operation.
441                         // Because of how our BufferRing works we should have it filled again.
442                         scheduleRead(allocHandle.isFirstRead());
443                         return;
444                     }
445 
446                     // If res is negative we should pass it to ioResult(...) which will either throw
447                     // or convert it to 0 if we could not read because the socket was not readable.
448                     allocHandle.lastBytesRead(ioResult("io_uring read", res));
449                 } else if (res > 0) {
450                     if (useBufferRing) {
451 
452                         if (IoUring.isRecvsendBundleEnabled()) {
453                             // If RECVSEND_BUNDLE is supported we need to do a bit more work here.
454                             // In this case we might need to obtain multiple buffers out of the buffer ring as
455                             // multiple of them might have been filled for one recv operation.
456                             // See https://github.com/axboe/liburing/wiki/
457                             // What's-new-with-io_uring-in-6.10#add-support-for-sendrecv-bundles
458                             int read = res;
459                             for (;;) {
460                                 int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
461                                 byteBuf = bufferRing.useBuffer(bid, read, more);
462                                 read -= byteBuf.readableBytes();
463                                 allocHandle.attemptedBytesRead(attemptedBytesRead);
464                                 allocHandle.lastBytesRead(byteBuf.readableBytes());
465 
466                                 assert read >= 0;
467                                 if (read == 0) {
468                                     // Just break here, we will handle the byteBuf below and also fill the bufferRing
469                                     // if needed later.
470                                     break;
471                                 }
472                                 allocHandle.incMessagesRead(1);
473                                 pipeline.fireChannelRead(byteBuf);
474                                 byteBuf = null;
475 
476                                 // Fill a new buffer for the bid after we fired the buffer through the pipeline.
477                                 // We do it only after we called fireChannelRead(...) as there is a good chance
478                                 // that the user will have released the buffer. In this case we reduce memory usage.
479                                 bufferRing.fillBuffer();
480                                 bid = bufferRing.nextBid(bid);
481                                 if (!allocHandle.continueReading()) {
482                                     // We should call fireChannelReadComplete() to mimic a normal read loop.
483                                     allocHandle.readComplete();
484                                     pipeline.fireChannelReadComplete();
485                                     allocHandle.reset(config());
486                                 }
487                             }
488                         } else {
489                             int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
490                             byteBuf = bufferRing.useBuffer(bid, res, more);
491                             allocHandle.attemptedBytesRead(attemptedBytesRead);
492                             allocHandle.lastBytesRead(res);
493                         }
494                     } else {
495                         int attemptedBytesRead = byteBuf.writableBytes();
496                         byteBuf.writerIndex(byteBuf.writerIndex() + res);
497                         allocHandle.attemptedBytesRead(attemptedBytesRead);
498                         allocHandle.lastBytesRead(res);
499                     }
500                 } else {
501                     // EOF which we signal with -1.
502                     allocHandle.lastBytesRead(-1);
503                 }
504                 if (allocHandle.lastBytesRead() <= 0) {
505                     // byteBuf might be null if we used a buffer ring.
506                     if (byteBuf != null) {
507                         // nothing was read, release the buffer.
508                         byteBuf.release();
509                         byteBuf = null;
510                     }
511                     allDataRead = allocHandle.lastBytesRead() < 0;
512                     if (allDataRead) {
513                         // There is nothing left to read as we received an EOF.
514                         shutdownInput(true);
515                     }
516                     allocHandle.readComplete();
517                     pipeline.fireChannelReadComplete();
518                     return;
519                 }
520 
521                 allocHandle.incMessagesRead(1);
522                 pipeline.fireChannelRead(byteBuf);
523                 byteBuf = null;
524                 if (useBufferRing && !more) {
525                     // Fill a new buffer for the bid after we fired the buffer through the pipeline.
526                     // We do it only after we called fireChannelRead(...) as there is a good chance
527                     // that the user will have released the buffer. In this case we reduce memory usage.
528                     bufferRing.fillBuffer();
529                 }
530                 scheduleNextRead(pipeline, allocHandle, rearm, empty);
531             } catch (Throwable t) {
532                 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
533             }
534         }
535 
536         private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
537                                       boolean rearm, boolean empty) {
538             if (allocHandle.continueReading() && !empty) {
539                 if (rearm) {
540                     // We only should schedule another read if we need to rearm.
541                     // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#multi-shot
542                     scheduleRead(false);
543                 }
544             } else {
545                 // We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
546                 allocHandle.readComplete();
547                 pipeline.fireChannelReadComplete();
548             }
549         }
550 
551         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
552                                          Throwable cause, boolean allDataRead,
553                                          IoUringRecvByteAllocatorHandle allocHandle) {
554             if (byteBuf != null) {
555                 if (byteBuf.isReadable()) {
556                     pipeline.fireChannelRead(byteBuf);
557                 } else {
558                     byteBuf.release();
559                 }
560             }
561             allocHandle.readComplete();
562             pipeline.fireChannelReadComplete();
563             pipeline.fireExceptionCaught(cause);
564             if (allDataRead || cause instanceof IOException) {
565                 shutdownInput(true);
566             }
567         }
568 
569         @Override
570         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
571             writeId = 0;
572             writeOpCode = 0;
573 
574             ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
575             Object current = channelOutboundBuffer.current();
576             if (current instanceof IoUringFileRegion) {
577                 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
578                 try {
579                     int result = res >= 0 ? res : ioResult("io_uring splice", res);
580                     if (result == 0 && fileRegion.count() > 0) {
581                         validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
582                         return false;
583                     }
584                     int progress = fileRegion.handleResult(result, data);
585                     if (progress == -1) {
586                         // Done with writing
587                         channelOutboundBuffer.remove();
588                     } else if (progress > 0) {
589                         channelOutboundBuffer.progress(progress);
590                     }
591                 } catch (Throwable cause) {
592                     handleWriteError(cause);
593                 }
594                 return true;
595             }
596 
597             if (res >= 0) {
598                 channelOutboundBuffer.removeBytes(res);
599             } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
600                 return true;
601             } else {
602                 try {
603                     if (ioResult("io_uring write", res) == 0) {
604                         return false;
605                     }
606                 } catch (Throwable cause) {
607                     handleWriteError(cause);
608                 }
609             }
610             return true;
611         }
612 
613         @Override
614         protected void freeResourcesNow(IoRegistration reg) {
615             super.freeResourcesNow(reg);
616             assert readBuffer == null;
617         }
618     }
619 
620     @Override
621     protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
622         if (readId != 0) {
623             // Let's try to cancel outstanding reads as these might be submitted and waiting for data (via fastpoll).
624             assert numOutstandingReads == 1 || numOutstandingReads == -1;
625             IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, readId, Native.IORING_OP_RECV);
626             long id = registration.submit(ops);
627             assert id != 0;
628             readId = 0;
629         }
630     }
631 
632     @Override
633     protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
634         if (writeId != 0) {
635             // Let's try to cancel outstanding writes as these might be submitted and waiting to finish writing
636             // (via fastpoll).
637             assert numOutstandingWrites == 1;
638             assert writeOpCode != 0;
639             long id = registration.submit(IoUringIoOps.newAsyncCancel((byte) 0, writeId, writeOpCode));
640             assert id != 0;
641             writeId = 0;
642         }
643     }
644 
645     @Override
646     protected boolean socketIsEmpty(int flags) {
647         return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
648     }
649 
650     @Override
651     protected void submitAndRunNow() {
652         if (writeId != 0) {
653             // Force a submit and processing of the completions to ensure we drain the outbound buffer and
654             // send the data to the remote peer.
655             ((IoUringIoHandler) registration().attachment()).submitAndRunNow(writeId);
656         }
657         super.submitAndRunNow();
658     }
659 
660     @Override
661     boolean isPollInFirst() {
662         return bufferRing == null || !bufferRing.isUsable();
663     }
664 }