View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultFileRegion;
27  import io.netty.channel.EventLoop;
28  import io.netty.channel.IoRegistration;
29  import io.netty.channel.socket.DuplexChannel;
30  import io.netty.channel.unix.IovArray;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.io.IOException;
35  import java.net.SocketAddress;
36  
37  import static io.netty.channel.unix.Errors.ioResult;
38  
39  abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
40      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
41      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
42  
43      // Store the opCode so we know if we used WRITE or WRITEV.
44      byte writeOpCode;
45      // Keep track of the ids used for write and read so we can cancel these when needed.
46      long writeId;
47      byte readOpCode;
48      long readId;
49  
50      // The configured buffer ring if any
51      private IoUringBufferRing bufferRing;
52  
53      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
54          super(parent, socket, active);
55      }
56  
57      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
58          super(parent, socket, remote);
59      }
60  
61      @Override
62      public ChannelMetadata metadata() {
63          return METADATA;
64      }
65  
66      @Override
67      protected AbstractUringUnsafe newUnsafe() {
68          return new IoUringStreamUnsafe();
69      }
70  
71      @Override
72      public final ChannelFuture shutdown() {
73          return shutdown(newPromise());
74      }
75  
76      @Override
77      public final ChannelFuture shutdown(final ChannelPromise promise) {
78          ChannelFuture shutdownOutputFuture = shutdownOutput();
79          if (shutdownOutputFuture.isDone()) {
80              shutdownOutputDone(shutdownOutputFuture, promise);
81          } else {
82              shutdownOutputFuture.addListener(new ChannelFutureListener() {
83                  @Override
84                  public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
85                      shutdownOutputDone(shutdownOutputFuture, promise);
86                  }
87              });
88          }
89          return promise;
90      }
91  
92      @Override
93      protected final void doShutdownOutput() throws Exception {
94          socket.shutdown(false, true);
95      }
96  
97      private void shutdownInput0(final ChannelPromise promise) {
98          try {
99              socket.shutdown(true, false);
100             promise.setSuccess();
101         } catch (Throwable cause) {
102             promise.setFailure(cause);
103         }
104     }
105 
106     @Override
107     public final boolean isOutputShutdown() {
108         return socket.isOutputShutdown();
109     }
110 
111     @Override
112     public final boolean isInputShutdown() {
113         return socket.isInputShutdown();
114     }
115 
116     @Override
117     public final boolean isShutdown() {
118         return socket.isShutdown();
119     }
120 
121     @Override
122     public final ChannelFuture shutdownOutput() {
123         return shutdownOutput(newPromise());
124     }
125 
126     @Override
127     public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
128         EventLoop loop = eventLoop();
129         if (loop.inEventLoop()) {
130             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
131         } else {
132             loop.execute(new Runnable() {
133                 @Override
134                 public void run() {
135                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
136                 }
137             });
138         }
139 
140         return promise;
141     }
142 
143     @Override
144     public final ChannelFuture shutdownInput() {
145         return shutdownInput(newPromise());
146     }
147 
148     @Override
149     public final ChannelFuture shutdownInput(final ChannelPromise promise) {
150         EventLoop loop = eventLoop();
151         if (loop.inEventLoop()) {
152             shutdownInput0(promise);
153         } else {
154             loop.execute(new Runnable() {
155                 @Override
156                 public void run() {
157                     shutdownInput0(promise);
158                 }
159             });
160         }
161         return promise;
162     }
163 
164     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
165         ChannelFuture shutdownInputFuture = shutdownInput();
166         if (shutdownInputFuture.isDone()) {
167             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
168         } else {
169             shutdownInputFuture.addListener(new ChannelFutureListener() {
170                 @Override
171                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
172                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
173                 }
174             });
175         }
176     }
177 
178     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
179                                      ChannelFuture shutdownInputFuture,
180                                      ChannelPromise promise) {
181         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
182         Throwable shutdownInputCause = shutdownInputFuture.cause();
183         if (shutdownOutputCause != null) {
184             if (shutdownInputCause != null) {
185                 logger.info("Exception suppressed because a previous exception occurred.",
186                              shutdownInputCause);
187             }
188             promise.setFailure(shutdownOutputCause);
189         } else if (shutdownInputCause != null) {
190             promise.setFailure(shutdownInputCause);
191         } else {
192             promise.setSuccess();
193         }
194     }
195 
196     @Override
197     protected final void doRegister(ChannelPromise promise) {
198         ChannelPromise registerPromise = this.newPromise();
199         // Ensure that the buffer group is properly set before channel::read
200         registerPromise.addListener(f -> {
201             if (f.isSuccess()) {
202                try {
203                    short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
204                    if (bgid >= 0) {
205                        final IoUringIoHandler ioUringIoHandler = registration().attachment();
206                        bufferRing = ioUringIoHandler.findBufferRing(bgid);
207                    }
208                    if (active) {
209                        // Register for POLLRDHUP if this channel is already considered active.
210                        schedulePollRdHup();
211                    }
212                } finally {
213                    promise.setSuccess();
214                }
215             } else {
216                 promise.setFailure(f.cause());
217             }
218         });
219 
220         super.doRegister(registerPromise);
221     }
222 
223     @Override
224     protected Object filterOutboundMessage(Object msg) {
225         // Since we cannot use synchronous sendfile,
226         // the channel can only support DefaultFileRegion instead of FileRegion.
227         if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
228             return new IoUringFileRegion((DefaultFileRegion) msg);
229         }
230 
231         return super.filterOutboundMessage(msg);
232     }
233 
234     protected class IoUringStreamUnsafe extends AbstractUringUnsafe {
235 
236         private ByteBuf readBuffer;
237 
238         @Override
239         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
240             assert writeId == 0;
241 
242             int fd = fd().intValue();
243             IoRegistration registration = registration();
244             IoUringIoHandler handler = registration.attachment();
245             IovArray iovArray = handler.iovArray();
246             int offset = iovArray.count();
247 
248             try {
249                 in.forEachFlushedMessage(filterWriteMultiple(iovArray));
250             } catch (Exception e) {
251                 // This should never happen, anyway fallback to single write.
252                 return scheduleWriteSingle(in.current());
253             }
254             long iovArrayAddress = iovArray.memoryAddress(offset);
255             int iovArrayLength = iovArray.count() - offset;
256             // Should not use sendmsg_zc, just use normal writev.
257             IoUringIoOps ops = IoUringIoOps.newWritev(fd, (byte) 0, 0, iovArrayAddress, iovArrayLength, nextOpsId());
258 
259             byte opCode = ops.opcode();
260             writeId = registration.submit(ops);
261             writeOpCode = opCode;
262             if (writeId == 0) {
263                 return 0;
264             }
265             return 1;
266         }
267 
268         protected ChannelOutboundBuffer.MessageProcessor filterWriteMultiple(IovArray iovArray) {
269            return iovArray;
270         }
271 
272         @Override
273         protected int scheduleWriteSingle(Object msg) {
274             assert writeId == 0;
275 
276             int fd = fd().intValue();
277             IoRegistration registration = registration();
278             final IoUringIoOps ops;
279             if (msg instanceof IoUringFileRegion) {
280                 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
281                 try {
282                     fileRegion.open();
283                 } catch (IOException e) {
284                     this.handleWriteError(e);
285                     return 0;
286                 }
287                 ops = fileRegion.splice(fd);
288             } else {
289                 ByteBuf buf = (ByteBuf) msg;
290                 long address = IoUring.memoryAddress(buf) + buf.readerIndex();
291                 int length = buf.readableBytes();
292                 short opsid = nextOpsId();
293 
294                 ops = IoUringIoOps.newSend(fd, (byte) 0, 0, address, length, opsid);
295             }
296             byte opCode = ops.opcode();
297             writeId = registration.submit(ops);
298             writeOpCode = opCode;
299             if (writeId == 0) {
300                 return 0;
301             }
302             return 1;
303         }
304 
305         private int calculateRecvFlags(boolean first) {
306             // Depending on if this is the first read or not we will use Native.MSG_DONTWAIT.
307             // The idea is that if the socket is blocking we can do the first read in a blocking fashion
308             // and so not need to also register POLLIN. As we can not 100 % sure if reads after the first will
309             // be possible directly we schedule these with Native.MSG_DONTWAIT. This allows us to still be
310             // able to signal the fireChannelReadComplete() in a timely manner and be consistent with other
311             // transports.
312             if (first) {
313                 return 0;
314             }
315             return Native.MSG_DONTWAIT;
316         }
317 
318         private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
319             // Depending on if socketIsEmpty is true we will arm the poll upfront and skip the initial transfer
320             // attempt.
321             // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#socket-state
322             if (first) {
323                 // IORING_RECVSEND_POLL_FIRST and IORING_CQE_F_SOCK_NONEMPTY were added in the same release (5.19).
324                 // We need to check if it's supported as otherwise providing these would result in an -EINVAL.
325                 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
326                         Native.IORING_RECVSEND_POLL_FIRST : 0;
327             }
328             return 0;
329         }
330 
331         @Override
332         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
333             assert readBuffer == null;
334             assert readId == 0 : readId;
335             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
336 
337             if (bufferRing != null && bufferRing.isUsable()) {
338                 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
339             }
340 
341             // We either have no buffer ring configured or we force a recv without using a buffer ring.
342             ByteBuf byteBuf = allocHandle.allocate(alloc());
343             try {
344                 int fd = fd().intValue();
345                 IoRegistration registration = registration();
346                 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
347                 int recvFlags = calculateRecvFlags(first);
348 
349                 IoUringIoOps ops = IoUringIoOps.newRecv(fd, (byte) 0, ioPrio, recvFlags,
350                         IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
351                 readId = registration.submit(ops);
352                 readOpCode = Native.IORING_OP_RECV;
353                 if (readId == 0) {
354                     return 0;
355                 }
356                 readBuffer = byteBuf;
357                 byteBuf = null;
358                 return 1;
359             } finally {
360                 if (byteBuf != null) {
361                     byteBuf.release();
362                 }
363             }
364         }
365 
366         private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
367             short bgId = bufferRing.bufferGroupId();
368             try {
369                 boolean multishot = IoUring.isRecvMultishotEnabled();
370                 byte flags = (byte) Native.IOSQE_BUFFER_SELECT;
371                 short ioPrio;
372                 final int recvFlags;
373                 if (multishot) {
374                     ioPrio = Native.IORING_RECV_MULTISHOT;
375                     recvFlags = 0;
376                 } else {
377                     // We should only use the calculate*() methods if this is not a multishot recv, as otherwise
378                     // the would be applied until the multishot will be re-armed.
379                     ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
380                     recvFlags = calculateRecvFlags(first);
381                 }
382                 if (IoUring.isRecvsendBundleEnabled()) {
383                     // See https://github.com/axboe/liburing/wiki/
384                     // What's-new-with-io_uring-in-6.10#add-support-for-sendrecv-bundles
385                     ioPrio |= Native.IORING_RECVSEND_BUNDLE;
386                 }
387                 IoRegistration registration = registration();
388                 int fd = fd().intValue();
389                 IoUringIoOps ops = IoUringIoOps.newRecv(
390                         fd, flags, ioPrio, recvFlags, 0,
391                         0, nextOpsId(), bgId
392                 );
393                 readId = registration.submit(ops);
394                 readOpCode = Native.IORING_OP_RECV;
395                 if (readId == 0) {
396                     return 0;
397                 }
398                 if (multishot) {
399                     // Return -1 to signal we used multishot and so expect multiple recvComplete(...) calls.
400                     return -1;
401                 }
402                 return 1;
403             } catch (IllegalArgumentException illegalArgumentException) {
404                 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
405                 return 0;
406             }
407         }
408 
409         @Override
410         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
411             ByteBuf byteBuf = readBuffer;
412             readBuffer = null;
413             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
414                 readId = 0;
415                 // In case of cancellation we should reset the last used buffer ring to null as we will select a new one
416                 // when calling scheduleRead(..)
417                 if (byteBuf != null) {
418                     //recv without buffer ring
419                     byteBuf.release();
420                 }
421                 return;
422             }
423             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
424             boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
425             short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
426             boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
427 
428             boolean empty = socketIsEmpty(flags);
429             if (rearm) {
430                 // Only reset if we don't use multi-shot or we need to re-arm because the multi-shot was cancelled.
431                 readId = 0;
432             }
433 
434             boolean allDataRead = false;
435 
436             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
437             final ChannelPipeline pipeline = pipeline();
438 
439             try {
440                 if (res < 0) {
441                     if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
442                         // try to expand the buffer ring by adding more buffers to it if there is any space left.
443                         if (!bufferRing.expand()) {
444                             // We couldn't expand the ring anymore so notify the user that we did run out of buffers
445                             // without the ability to expand it.
446                             // If this happens to often the user should most likely increase the buffer ring size.
447                             pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
448                         }
449 
450                         // Let's trigger a read again without consulting the RecvByteBufAllocator.Handle as
451                         // we can't count this as a "real" read operation.
452                         // Because of how our BufferRing works we should have it filled again.
453                         scheduleRead(allocHandle.isFirstRead());
454                         return;
455                     }
456 
457                     // If res is negative we should pass it to ioResult(...) which will either throw
458                     // or convert it to 0 if we could not read because the socket was not readable.
459                     allocHandle.lastBytesRead(ioResult("io_uring read", res));
460                 } else if (res > 0) {
461                     if (useBufferRing) {
462                         // If RECVSEND_BUNDLE is used we need to do a bit more work here.
463                         // In this case we might need to obtain multiple buffers out of the buffer ring as
464                         // multiple of them might have been filled for one recv operation.
465                         // See https://github.com/axboe/liburing/wiki/
466                         // What's-new-with-io_uring-in-6.10#add-support-for-sendrecv-bundles
467                         int read = res;
468                         for (;;) {
469                             int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
470                             byteBuf = bufferRing.useBuffer(bid, read, more);
471                             read -= byteBuf.readableBytes();
472                             allocHandle.attemptedBytesRead(attemptedBytesRead);
473                             allocHandle.lastBytesRead(byteBuf.readableBytes());
474 
475                             assert read >= 0;
476                             if (read == 0) {
477                                 // Just break here, we will handle the byteBuf below and also fill the bufferRing
478                                 // if needed later.
479                                 break;
480                             }
481                             allocHandle.incMessagesRead(1);
482                             pipeline.fireChannelRead(byteBuf);
483                             byteBuf = null;
484                             bid = bufferRing.nextBid(bid);
485                             if (!allocHandle.continueReading()) {
486                                 // We should call fireChannelReadComplete() to mimic a normal read loop.
487                                 allocHandle.readComplete();
488                                 pipeline.fireChannelReadComplete();
489                                 allocHandle.reset(config());
490                             }
491                         }
492                     } else {
493                         int attemptedBytesRead = byteBuf.writableBytes();
494                         byteBuf.writerIndex(byteBuf.writerIndex() + res);
495                         allocHandle.attemptedBytesRead(attemptedBytesRead);
496                         allocHandle.lastBytesRead(res);
497                     }
498                 } else {
499                     // EOF which we signal with -1.
500                     allocHandle.lastBytesRead(-1);
501                 }
502                 if (allocHandle.lastBytesRead() <= 0) {
503                     // byteBuf might be null if we used a buffer ring.
504                     if (byteBuf != null) {
505                         // nothing was read, release the buffer.
506                         byteBuf.release();
507                         byteBuf = null;
508                     }
509                     allDataRead = allocHandle.lastBytesRead() < 0;
510                     if (allDataRead) {
511                         // There is nothing left to read as we received an EOF.
512                         shutdownInput(true);
513                     }
514                     allocHandle.readComplete();
515                     pipeline.fireChannelReadComplete();
516                     return;
517                 }
518 
519                 allocHandle.incMessagesRead(1);
520                 pipeline.fireChannelRead(byteBuf);
521                 byteBuf = null;
522                 scheduleNextRead(pipeline, allocHandle, rearm, empty);
523             } catch (Throwable t) {
524                 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
525             }
526         }
527 
528         private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
529                                       boolean rearm, boolean empty) {
530             if (allocHandle.continueReading() && !empty) {
531                 if (rearm) {
532                     // We only should schedule another read if we need to rearm.
533                     // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#multi-shot
534                     scheduleRead(false);
535                 }
536             } else {
537                 // We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
538                 allocHandle.readComplete();
539                 pipeline.fireChannelReadComplete();
540             }
541         }
542 
543         protected final void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
544                                          Throwable cause, boolean allDataRead,
545                                          IoUringRecvByteAllocatorHandle allocHandle) {
546             if (byteBuf != null) {
547                 if (byteBuf.isReadable()) {
548                     pipeline.fireChannelRead(byteBuf);
549                 } else {
550                     byteBuf.release();
551                 }
552             }
553             allocHandle.readComplete();
554             pipeline.fireChannelReadComplete();
555             pipeline.fireExceptionCaught(cause);
556             if (allDataRead || cause instanceof IOException) {
557                 shutdownInput(true);
558             }
559         }
560 
561         private boolean handleWriteCompleteFileRegion(ChannelOutboundBuffer channelOutboundBuffer,
562                                                       IoUringFileRegion fileRegion, int res, short data) {
563             try {
564                 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
565                     return true;
566                 }
567                 int result = res >= 0 ? res : ioResult("io_uring splice", res);
568                 if (result == 0 && fileRegion.count() > 0) {
569                     validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
570                     return false;
571                 }
572                 int progress = fileRegion.handleResult(result, data);
573                 if (progress == -1) {
574                     // Done with writing
575                     channelOutboundBuffer.remove();
576                 } else if (progress > 0) {
577                     channelOutboundBuffer.progress(progress);
578                 }
579             } catch (Throwable cause) {
580                 handleWriteError(cause);
581             }
582             return true;
583         }
584 
585         @Override
586         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
587             if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
588                 // We only want to reset these if IORING_CQE_F_NOTIF is not set.
589                 // If it's set we know this is only an extra notification for a write but we already handled
590                 // the write completions before.
591                 // See https://man7.org/linux/man-pages/man2/io_uring_enter.2.html section: IORING_OP_SEND_ZC
592                 writeId = 0;
593                 writeOpCode = 0;
594             }
595             ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
596             Object current = channelOutboundBuffer.current();
597             if (current instanceof IoUringFileRegion) {
598                 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
599                 return handleWriteCompleteFileRegion(channelOutboundBuffer, fileRegion, res, data);
600             }
601 
602             if (res >= 0) {
603                 channelOutboundBuffer.removeBytes(res);
604             } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
605                 return true;
606             } else {
607                 try {
608                     if (ioResult("io_uring write", res) == 0) {
609                         return false;
610                     }
611                 } catch (Throwable cause) {
612                     handleWriteError(cause);
613                 }
614             }
615             return true;
616         }
617 
618         @Override
619         public void unregistered() {
620             super.unregistered();
621             assert readBuffer == null;
622         }
623     }
624 
625     @Override
626     protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
627         if (readId != 0) {
628             // Let's try to cancel outstanding reads as these might be submitted and waiting for data (via fastpoll).
629             assert numOutstandingReads == 1 || numOutstandingReads == -1;
630             IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, readId, readOpCode);
631             long id = registration.submit(ops);
632             assert id != 0;
633             readId = 0;
634         }
635     }
636 
637     @Override
638     protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
639         if (writeId != 0) {
640             // Let's try to cancel outstanding writes as these might be submitted and waiting to finish writing
641             // (via fastpoll).
642             assert numOutstandingWrites == 1;
643             assert writeOpCode != 0;
644             long id = registration.submit(IoUringIoOps.newAsyncCancel((byte) 0, writeId, writeOpCode));
645             assert id != 0;
646             writeId = 0;
647         }
648     }
649 
650     @Override
651     protected boolean socketIsEmpty(int flags) {
652         return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
653     }
654 
655     @Override
656     boolean isPollInFirst() {
657         return bufferRing == null || !bufferRing.isUsable();
658     }
659 }