View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultFileRegion;
27  import io.netty.channel.EventLoop;
28  import io.netty.channel.IoRegistration;
29  import io.netty.channel.socket.DuplexChannel;
30  import io.netty.channel.unix.IovArray;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.io.IOException;
35  import java.net.SocketAddress;
36  
37  import static io.netty.channel.unix.Errors.ioResult;
38  
39  abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
40      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
41      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
42  
43      // Store the opCode so we know if we used WRITE or WRITEV.
44      byte writeOpCode;
45      // Keep track of the ids used for write and read so we can cancel these when needed.
46      long writeId;
47      byte readOpCode;
48      long readId;
49  
50      // The configured buffer ring if any
51      private IoUringBufferRing bufferRing;
52  
53      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
54          super(parent, socket, active);
55      }
56  
57      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
58          super(parent, socket, remote);
59      }
60  
61      @Override
62      public ChannelMetadata metadata() {
63          return METADATA;
64      }
65  
66      @Override
67      protected AbstractUringUnsafe newUnsafe() {
68          return new IoUringStreamUnsafe();
69      }
70  
71      @Override
72      public final ChannelFuture shutdown() {
73          return shutdown(newPromise());
74      }
75  
76      @Override
77      public final ChannelFuture shutdown(final ChannelPromise promise) {
78          ChannelFuture shutdownOutputFuture = shutdownOutput();
79          if (shutdownOutputFuture.isDone()) {
80              shutdownOutputDone(shutdownOutputFuture, promise);
81          } else {
82              shutdownOutputFuture.addListener(new ChannelFutureListener() {
83                  @Override
84                  public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
85                      shutdownOutputDone(shutdownOutputFuture, promise);
86                  }
87              });
88          }
89          return promise;
90      }
91  
92      @Override
93      protected final void doShutdownOutput() throws Exception {
94          socket.shutdown(false, true);
95      }
96  
97      private void shutdownInput0(final ChannelPromise promise) {
98          try {
99              socket.shutdown(true, false);
100             promise.setSuccess();
101         } catch (Throwable cause) {
102             promise.setFailure(cause);
103         }
104     }
105 
106     @Override
107     public final boolean isOutputShutdown() {
108         return socket.isOutputShutdown();
109     }
110 
111     @Override
112     public final boolean isInputShutdown() {
113         return socket.isInputShutdown();
114     }
115 
116     @Override
117     public final boolean isShutdown() {
118         return socket.isShutdown();
119     }
120 
121     @Override
122     public final ChannelFuture shutdownOutput() {
123         return shutdownOutput(newPromise());
124     }
125 
126     @Override
127     public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
128         EventLoop loop = eventLoop();
129         if (loop.inEventLoop()) {
130             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
131         } else {
132             loop.execute(new Runnable() {
133                 @Override
134                 public void run() {
135                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
136                 }
137             });
138         }
139 
140         return promise;
141     }
142 
143     @Override
144     public final ChannelFuture shutdownInput() {
145         return shutdownInput(newPromise());
146     }
147 
148     @Override
149     public final ChannelFuture shutdownInput(final ChannelPromise promise) {
150         EventLoop loop = eventLoop();
151         if (loop.inEventLoop()) {
152             shutdownInput0(promise);
153         } else {
154             loop.execute(new Runnable() {
155                 @Override
156                 public void run() {
157                     shutdownInput0(promise);
158                 }
159             });
160         }
161         return promise;
162     }
163 
164     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
165         ChannelFuture shutdownInputFuture = shutdownInput();
166         if (shutdownInputFuture.isDone()) {
167             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
168         } else {
169             shutdownInputFuture.addListener(new ChannelFutureListener() {
170                 @Override
171                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
172                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
173                 }
174             });
175         }
176     }
177 
178     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
179                                      ChannelFuture shutdownInputFuture,
180                                      ChannelPromise promise) {
181         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
182         Throwable shutdownInputCause = shutdownInputFuture.cause();
183         if (shutdownOutputCause != null) {
184             if (shutdownInputCause != null) {
185                 logger.info("Exception suppressed because a previous exception occurred.",
186                              shutdownInputCause);
187             }
188             promise.setFailure(shutdownOutputCause);
189         } else if (shutdownInputCause != null) {
190             promise.setFailure(shutdownInputCause);
191         } else {
192             promise.setSuccess();
193         }
194     }
195 
196     @Override
197     protected final void doRegister(ChannelPromise promise) {
198         ChannelPromise registerPromise = this.newPromise();
199         // Ensure that the buffer group is properly set before channel::read
200         registerPromise.addListener(f -> {
201             if (f.isSuccess()) {
202                try {
203                    short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
204                    if (bgid >= 0) {
205                        final IoUringIoHandler ioUringIoHandler = registration().attachment();
206                        bufferRing = ioUringIoHandler.findBufferRing(bgid);
207                    }
208                    if (active) {
209                        // Register for POLLRDHUP if this channel is already considered active.
210                        schedulePollRdHup();
211                    }
212                } finally {
213                    promise.setSuccess();
214                }
215             } else {
216                 promise.setFailure(f.cause());
217             }
218         });
219 
220         super.doRegister(registerPromise);
221     }
222 
223     @Override
224     protected Object filterOutboundMessage(Object msg) {
225         // Since we cannot use synchronous sendfile,
226         // the channel can only support DefaultFileRegion instead of FileRegion.
227         if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
228             return new IoUringFileRegion((DefaultFileRegion) msg);
229         }
230 
231         return super.filterOutboundMessage(msg);
232     }
233 
234     protected class IoUringStreamUnsafe extends AbstractUringUnsafe {
235 
236         private ByteBuf readBuffer;
237 
238         @Override
239         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
240             assert writeId == 0;
241 
242             int fd = fd().intValue();
243             IoRegistration registration = registration();
244             IoUringIoHandler handler = registration.attachment();
245             IovArray iovArray = handler.iovArray();
246             int offset = iovArray.count();
247 
248             try {
249                 in.forEachFlushedMessage(iovArray);
250             } catch (Exception e) {
251                 // This should never happen, anyway fallback to single write.
252                 return scheduleWriteSingle(in.current());
253             }
254             long iovArrayAddress = iovArray.memoryAddress(offset);
255             int iovArrayLength = iovArray.count() - offset;
256             // Should not use sendmsg_zc, just use normal writev.
257             IoUringIoOps ops = IoUringIoOps.newWritev(fd, (byte) 0, 0, iovArrayAddress, iovArrayLength, nextOpsId());
258 
259             byte opCode = ops.opcode();
260             writeId = registration.submit(ops);
261             writeOpCode = opCode;
262             if (writeId == 0) {
263                 return 0;
264             }
265             return 1;
266         }
267 
268         @Override
269         protected int scheduleWriteSingle(Object msg) {
270             assert writeId == 0;
271 
272             int fd = fd().intValue();
273             IoRegistration registration = registration();
274             final IoUringIoOps ops;
275             if (msg instanceof IoUringFileRegion) {
276                 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
277                 try {
278                     fileRegion.open();
279                 } catch (IOException e) {
280                     this.handleWriteError(e);
281                     return 0;
282                 }
283                 ops = fileRegion.splice(fd);
284             } else {
285                 ByteBuf buf = (ByteBuf) msg;
286                 long address = IoUring.memoryAddress(buf) + buf.readerIndex();
287                 int length = buf.readableBytes();
288                 short opsid = nextOpsId();
289 
290                 ops = IoUringIoOps.newWrite(fd, (byte) 0, 0, address, length, opsid);
291             }
292             byte opCode = ops.opcode();
293             writeId = registration.submit(ops);
294             writeOpCode = opCode;
295             if (writeId == 0) {
296                 return 0;
297             }
298             return 1;
299         }
300 
301         private int calculateRecvFlags(boolean first) {
302             // Depending on if this is the first read or not we will use Native.MSG_DONTWAIT.
303             // The idea is that if the socket is blocking we can do the first read in a blocking fashion
304             // and so not need to also register POLLIN. As we can not 100 % sure if reads after the first will
305             // be possible directly we schedule these with Native.MSG_DONTWAIT. This allows us to still be
306             // able to signal the fireChannelReadComplete() in a timely manner and be consistent with other
307             // transports.
308             if (first) {
309                 return 0;
310             }
311             return Native.MSG_DONTWAIT;
312         }
313 
314         private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
315             // Depending on if socketIsEmpty is true we will arm the poll upfront and skip the initial transfer
316             // attempt.
317             // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#socket-state
318             if (first) {
319                 // IORING_RECVSEND_POLL_FIRST and IORING_CQE_F_SOCK_NONEMPTY were added in the same release (5.19).
320                 // We need to check if it's supported as otherwise providing these would result in an -EINVAL.
321                 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
322                         Native.IORING_RECVSEND_POLL_FIRST : 0;
323             }
324             return 0;
325         }
326 
327         @Override
328         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
329             assert readBuffer == null;
330             assert readId == 0 : readId;
331             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
332 
333             if (bufferRing != null && bufferRing.isUsable()) {
334                 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
335             }
336 
337             // We either have no buffer ring configured or we force a recv without using a buffer ring.
338             ByteBuf byteBuf = allocHandle.allocate(alloc());
339             try {
340                 int fd = fd().intValue();
341                 IoRegistration registration = registration();
342                 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
343                 int recvFlags = calculateRecvFlags(first);
344 
345                 IoUringIoOps ops = IoUringIoOps.newRecv(fd, (byte) 0, ioPrio, recvFlags,
346                         IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
347                 readId = registration.submit(ops);
348                 readOpCode = Native.IORING_OP_RECV;
349                 if (readId == 0) {
350                     return 0;
351                 }
352                 readBuffer = byteBuf;
353                 byteBuf = null;
354                 return 1;
355             } finally {
356                 if (byteBuf != null) {
357                     byteBuf.release();
358                 }
359             }
360         }
361 
362         private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
363             short bgId = bufferRing.bufferGroupId();
364             try {
365                 boolean multishot = IoUring.isRecvMultishotEnabled();
366                 byte flags = (byte) Native.IOSQE_BUFFER_SELECT;
367                 short ioPrio;
368                 final int recvFlags;
369                 if (multishot) {
370                     ioPrio = Native.IORING_RECV_MULTISHOT;
371                     recvFlags = 0;
372                 } else {
373                     // We should only use the calculate*() methods if this is not a multishot recv, as otherwise
374                     // the would be applied until the multishot will be re-armed.
375                     ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
376                     recvFlags = calculateRecvFlags(first);
377                 }
378                 if (IoUring.isRecvsendBundleEnabled()) {
379                     // See https://github.com/axboe/liburing/wiki/
380                     // What's-new-with-io_uring-in-6.10#add-support-for-sendrecv-bundles
381                     ioPrio |= Native.IORING_RECVSEND_BUNDLE;
382                 }
383                 IoRegistration registration = registration();
384                 int fd = fd().intValue();
385                 IoUringIoOps ops = IoUringIoOps.newRecv(
386                         fd, flags, ioPrio, recvFlags, 0,
387                         0, nextOpsId(), bgId
388                 );
389                 readId = registration.submit(ops);
390                 readOpCode = Native.IORING_OP_RECV;
391                 if (readId == 0) {
392                     return 0;
393                 }
394                 if (multishot) {
395                     // Return -1 to signal we used multishot and so expect multiple recvComplete(...) calls.
396                     return -1;
397                 }
398                 return 1;
399             } catch (IllegalArgumentException illegalArgumentException) {
400                 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
401                 return 0;
402             }
403         }
404 
405         @Override
406         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
407             ByteBuf byteBuf = readBuffer;
408             readBuffer = null;
409             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
410                 readId = 0;
411                 // In case of cancellation we should reset the last used buffer ring to null as we will select a new one
412                 // when calling scheduleRead(..)
413                 if (byteBuf != null) {
414                     //recv without buffer ring
415                     byteBuf.release();
416                 }
417                 return;
418             }
419             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
420             boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
421             short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
422             boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
423 
424             boolean empty = socketIsEmpty(flags);
425             if (rearm) {
426                 // Only reset if we don't use multi-shot or we need to re-arm because the multi-shot was cancelled.
427                 readId = 0;
428             }
429 
430             boolean allDataRead = false;
431 
432             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
433             final ChannelPipeline pipeline = pipeline();
434 
435             try {
436                 if (res < 0) {
437                     if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
438                         // try to expand the buffer ring by adding more buffers to it if there is any space left.
439                         if (!bufferRing.expand()) {
440                             // We couldn't expand the ring anymore so notify the user that we did run out of buffers
441                             // without the ability to expand it.
442                             // If this happens to often the user should most likely increase the buffer ring size.
443                             pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
444                         }
445 
446                         // Let's trigger a read again without consulting the RecvByteBufAllocator.Handle as
447                         // we can't count this as a "real" read operation.
448                         // Because of how our BufferRing works we should have it filled again.
449                         scheduleRead(allocHandle.isFirstRead());
450                         return;
451                     }
452 
453                     // If res is negative we should pass it to ioResult(...) which will either throw
454                     // or convert it to 0 if we could not read because the socket was not readable.
455                     allocHandle.lastBytesRead(ioResult("io_uring read", res));
456                 } else if (res > 0) {
457                     if (useBufferRing) {
458                         // If RECVSEND_BUNDLE is used we need to do a bit more work here.
459                         // In this case we might need to obtain multiple buffers out of the buffer ring as
460                         // multiple of them might have been filled for one recv operation.
461                         // See https://github.com/axboe/liburing/wiki/
462                         // What's-new-with-io_uring-in-6.10#add-support-for-sendrecv-bundles
463                         int read = res;
464                         for (;;) {
465                             int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
466                             byteBuf = bufferRing.useBuffer(bid, read, more);
467                             read -= byteBuf.readableBytes();
468                             allocHandle.attemptedBytesRead(attemptedBytesRead);
469                             allocHandle.lastBytesRead(byteBuf.readableBytes());
470 
471                             assert read >= 0;
472                             if (read == 0) {
473                                 // Just break here, we will handle the byteBuf below and also fill the bufferRing
474                                 // if needed later.
475                                 break;
476                             }
477                             allocHandle.incMessagesRead(1);
478                             pipeline.fireChannelRead(byteBuf);
479                             byteBuf = null;
480                             bid = bufferRing.nextBid(bid);
481                             if (!allocHandle.continueReading()) {
482                                 // We should call fireChannelReadComplete() to mimic a normal read loop.
483                                 allocHandle.readComplete();
484                                 pipeline.fireChannelReadComplete();
485                                 allocHandle.reset(config());
486                             }
487                         }
488                     } else {
489                         int attemptedBytesRead = byteBuf.writableBytes();
490                         byteBuf.writerIndex(byteBuf.writerIndex() + res);
491                         allocHandle.attemptedBytesRead(attemptedBytesRead);
492                         allocHandle.lastBytesRead(res);
493                     }
494                 } else {
495                     // EOF which we signal with -1.
496                     allocHandle.lastBytesRead(-1);
497                 }
498                 if (allocHandle.lastBytesRead() <= 0) {
499                     // byteBuf might be null if we used a buffer ring.
500                     if (byteBuf != null) {
501                         // nothing was read, release the buffer.
502                         byteBuf.release();
503                         byteBuf = null;
504                     }
505                     allDataRead = allocHandle.lastBytesRead() < 0;
506                     if (allDataRead) {
507                         // There is nothing left to read as we received an EOF.
508                         shutdownInput(true);
509                     }
510                     allocHandle.readComplete();
511                     pipeline.fireChannelReadComplete();
512                     return;
513                 }
514 
515                 allocHandle.incMessagesRead(1);
516                 pipeline.fireChannelRead(byteBuf);
517                 byteBuf = null;
518                 scheduleNextRead(pipeline, allocHandle, rearm, empty);
519             } catch (Throwable t) {
520                 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
521             }
522         }
523 
524         private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
525                                       boolean rearm, boolean empty) {
526             if (allocHandle.continueReading() && !empty) {
527                 if (rearm) {
528                     // We only should schedule another read if we need to rearm.
529                     // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#multi-shot
530                     scheduleRead(false);
531                 }
532             } else {
533                 // We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
534                 allocHandle.readComplete();
535                 pipeline.fireChannelReadComplete();
536             }
537         }
538 
539         protected final void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
540                                          Throwable cause, boolean allDataRead,
541                                          IoUringRecvByteAllocatorHandle allocHandle) {
542             if (byteBuf != null) {
543                 if (byteBuf.isReadable()) {
544                     pipeline.fireChannelRead(byteBuf);
545                 } else {
546                     byteBuf.release();
547                 }
548             }
549             allocHandle.readComplete();
550             pipeline.fireChannelReadComplete();
551             pipeline.fireExceptionCaught(cause);
552             if (allDataRead || cause instanceof IOException) {
553                 shutdownInput(true);
554             }
555         }
556 
557         private boolean handleWriteCompleteFileRegion(ChannelOutboundBuffer channelOutboundBuffer,
558                                                       IoUringFileRegion fileRegion, int res, short data) {
559             try {
560                 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
561                     return true;
562                 }
563                 int result = res >= 0 ? res : ioResult("io_uring splice", res);
564                 if (result == 0 && fileRegion.count() > 0) {
565                     validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
566                     return false;
567                 }
568                 int progress = fileRegion.handleResult(result, data);
569                 if (progress == -1) {
570                     // Done with writing
571                     channelOutboundBuffer.remove();
572                 } else if (progress > 0) {
573                     channelOutboundBuffer.progress(progress);
574                 }
575             } catch (Throwable cause) {
576                 handleWriteError(cause);
577             }
578             return true;
579         }
580 
581         @Override
582         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
583             if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
584                 // We only want to reset these if IORING_CQE_F_NOTIF is not set.
585                 // If it's set we know this is only an extra notification for a write but we already handled
586                 // the write completions before.
587                 // See https://man7.org/linux/man-pages/man2/io_uring_enter.2.html section: IORING_OP_SEND_ZC
588                 writeId = 0;
589                 writeOpCode = 0;
590             }
591             ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
592             Object current = channelOutboundBuffer.current();
593             if (current instanceof IoUringFileRegion) {
594                 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
595                 return handleWriteCompleteFileRegion(channelOutboundBuffer, fileRegion, res, data);
596             }
597 
598             if (res >= 0) {
599                 channelOutboundBuffer.removeBytes(res);
600             } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
601                 return true;
602             } else {
603                 try {
604                     if (ioResult("io_uring write", res) == 0) {
605                         return false;
606                     }
607                 } catch (Throwable cause) {
608                     handleWriteError(cause);
609                 }
610             }
611             return true;
612         }
613 
614         @Override
615         protected void freeResourcesNow(IoRegistration reg) {
616             super.freeResourcesNow(reg);
617             assert readBuffer == null;
618         }
619     }
620 
621     @Override
622     protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
623         if (readId != 0) {
624             // Let's try to cancel outstanding reads as these might be submitted and waiting for data (via fastpoll).
625             assert numOutstandingReads == 1 || numOutstandingReads == -1;
626             IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, readId, readOpCode);
627             long id = registration.submit(ops);
628             assert id != 0;
629             readId = 0;
630         }
631     }
632 
633     @Override
634     protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
635         if (writeId != 0) {
636             // Let's try to cancel outstanding writes as these might be submitted and waiting to finish writing
637             // (via fastpoll).
638             assert numOutstandingWrites == 1;
639             assert writeOpCode != 0;
640             long id = registration.submit(IoUringIoOps.newAsyncCancel((byte) 0, writeId, writeOpCode));
641             assert id != 0;
642             writeId = 0;
643         }
644     }
645 
646     @Override
647     protected boolean socketIsEmpty(int flags) {
648         return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
649     }
650 
651     @Override
652     boolean isPollInFirst() {
653         return bufferRing == null || !bufferRing.isUsable();
654     }
655 }