View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufHolder;
21  import io.netty.buffer.ByteBufUtil;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractChannel;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelOption;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.ServerChannel;
38  import io.netty.channel.socket.ChannelInputShutdownEvent;
39  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40  import io.netty.channel.socket.SocketChannelConfig;
41  import io.netty.channel.unix.Buffer;
42  import io.netty.channel.unix.Errors;
43  import io.netty.channel.unix.FileDescriptor;
44  import io.netty.channel.unix.UnixChannel;
45  import io.netty.channel.unix.UnixChannelUtil;
46  import io.netty.util.ReferenceCountUtil;
47  import io.netty.util.internal.logging.InternalLogger;
48  import io.netty.util.internal.logging.InternalLoggerFactory;
49  
50  import java.io.IOException;
51  import java.net.InetSocketAddress;
52  import java.net.SocketAddress;
53  import java.nio.ByteBuffer;
54  import java.nio.channels.AlreadyConnectedException;
55  import java.nio.channels.ClosedChannelException;
56  import java.nio.channels.ConnectionPendingException;
57  import java.nio.channels.NotYetConnectedException;
58  import java.nio.channels.UnresolvedAddressException;
59  import java.util.concurrent.ScheduledFuture;
60  import java.util.concurrent.TimeUnit;
61  
62  import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
63  import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
64  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
65  import static io.netty.util.internal.ObjectUtil.checkNotNull;
66  
67  
68  abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
69      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
70      final LinuxSocket socket;
71      protected volatile boolean active;
72  
73      // Different masks for outstanding I/O operations.
74      private static final int POLL_IN_SCHEDULED = 1;
75      private static final int POLL_OUT_SCHEDULED = 1 << 2;
76      private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
77      private static final int WRITE_SCHEDULED = 1 << 4;
78      private static final int READ_SCHEDULED = 1 << 5;
79      private static final int CONNECT_SCHEDULED = 1 << 6;
80  
81      private short opsId = Short.MIN_VALUE;
82  
83      private long pollInId;
84      private long pollOutId;
85      private long pollRdhupId;
86      private long connectId;
87  
88      // A byte is enough for now.
89      private byte ioState;
90  
91      // It's possible that multiple read / writes are issued. We need to keep track of these.
92      // Let's limit the amount of pending writes and reads by Short.MAX_VALUE. Maybe Byte.MAX_VALUE would also be good
93      // enough but let's be a bit more flexible for now.
94      private short numOutstandingWrites;
95      private short numOutstandingReads;
96  
97      private boolean readPending;
98      private boolean inReadComplete;
99  
100     private boolean inputClosedSeenErrorOnRead;
101 
102     /**
103      * The future of the current connection attempt.  If not null, subsequent connection attempts will fail.
104      */
105     private ChannelPromise connectPromise;
106     private ScheduledFuture<?> connectTimeoutFuture;
107     private SocketAddress requestedRemoteAddress;
108     private ByteBuffer remoteAddressMemory;
109     private MsgHdrMemoryArray msgHdrMemoryArray;
110 
111     private IoUringIoRegistration registration;
112 
113     private volatile SocketAddress local;
114     private volatile SocketAddress remote;
115 
116     AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
117         super(parent);
118         this.socket = checkNotNull(socket, "fd");
119 
120         if (active) {
121             // Directly cache the remote and local addresses
122             // See https://github.com/netty/netty/issues/2359
123             this.active = true;
124             this.local = socket.localAddress();
125             this.remote = socket.remoteAddress();
126         }
127 
128         logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
129     }
130 
131     AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
132         super(parent);
133         this.socket = checkNotNull(fd, "fd");
134         this.active = true;
135 
136         // Directly cache the remote and local addresses
137         // See https://github.com/netty/netty/issues/2359
138         this.remote = remote;
139         this.local = fd.localAddress();
140     }
141 
142     /**
143      * Returns the next id that should be used when submitting {@link IoUringIoOps}.
144      *
145      * @return  opsId
146      */
147     protected final short nextOpsId() {
148         short id = opsId++;
149 
150         // We use 0 for "none".
151         if (id == 0) {
152             id = opsId++;
153         }
154         return id;
155     }
156 
157     public final boolean isOpen() {
158         return socket.isOpen();
159     }
160 
161     @Override
162     public boolean isActive() {
163         return active;
164     }
165 
166     @Override
167     public final FileDescriptor fd() {
168         return socket;
169     }
170 
171     private AbstractUringUnsafe ioUringUnsafe() {
172         return (AbstractUringUnsafe) unsafe();
173     }
174 
175     @Override
176     protected boolean isCompatible(final EventLoop loop) {
177         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
178     }
179 
180     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
181         return newDirectBuffer(buf, buf);
182     }
183 
184     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
185         final int readableBytes = buf.readableBytes();
186         if (readableBytes == 0) {
187             ReferenceCountUtil.release(holder);
188             return Unpooled.EMPTY_BUFFER;
189         }
190 
191         final ByteBufAllocator alloc = alloc();
192         if (alloc.isDirectBufferPooled()) {
193             return newDirectBuffer0(holder, buf, alloc, readableBytes);
194         }
195 
196         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
197         if (directBuf == null) {
198             return newDirectBuffer0(holder, buf, alloc, readableBytes);
199         }
200 
201         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
202         ReferenceCountUtil.safeRelease(holder);
203         return directBuf;
204     }
205 
206     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
207         final ByteBuf directBuf = alloc.directBuffer(capacity);
208         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
209         ReferenceCountUtil.safeRelease(holder);
210         return directBuf;
211     }
212 
213     /**
214      * Cancel all outstanding reads
215      *
216      * @param registration          the {@link IoUringIoRegistration}.
217      * @param numOutstandingReads   the number of outstanding reads.
218      */
219     protected abstract void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads);
220 
221     /**
222      * Cancel all outstanding writes
223      *
224      * @param registration          the {@link IoUringIoRegistration}.
225      * @param numOutstandingWrites  the number of outstanding writes.
226      */
227     protected abstract void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites);
228 
229     @Override
230     protected void doDisconnect() throws Exception {
231     }
232 
233     private void freeRemoteAddressMemory() {
234         if (remoteAddressMemory != null) {
235             Buffer.free(remoteAddressMemory);
236             remoteAddressMemory = null;
237         }
238     }
239 
240     private void freeMsgHdrArray() {
241         if (msgHdrMemoryArray != null) {
242             msgHdrMemoryArray.release();
243             msgHdrMemoryArray = null;
244         }
245     }
246 
247     @Override
248     protected void doClose() throws Exception {
249         active = false;
250 
251         if (registration != null && registration.isValid()) {
252             if (socket.markClosed()) {
253                 boolean cancelConnect = false;
254                 try {
255                     ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
256                     if (connectPromise != null) {
257                         // Use tryFailure() instead of setFailure() to avoid the race against cancel().
258                         connectPromise.tryFailure(new ClosedChannelException());
259                         AbstractIoUringChannel.this.connectPromise = null;
260                         cancelConnect = true;
261                     }
262 
263                     cancelConnectTimeoutFuture();
264                 } finally {
265                     // It's important we cancel all outstanding connect, write and read operations now so
266                     // we will be able to process a delayed close if needed.
267                     ioUringUnsafe().cancelOps(cancelConnect);
268                 }
269 
270                 int fd = fd().intValue();
271                 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
272                 registration.submit(ops);
273             }
274         } else {
275             // This one was never registered just use a syscall to close.
276             socket.close();
277             ioUringUnsafe().freeResourcesNowIfNeeded(null);
278         }
279     }
280 
281     @Override
282     protected final void doBeginRead() {
283         if (inputClosedSeenErrorOnRead) {
284             // We did see an error while reading and so closed the input. Stop reading.
285             return;
286         }
287         if (readPending) {
288             // We already have a read pending.
289             return;
290         }
291         if (inReadComplete || !isActive()) {
292             // We are currently in the readComplete(...) callback which might issue more reads by itself. Just
293             // mark it as a pending read. If readComplete(...) will not issue more reads itself it will pick up
294             // the readPending flag, reset it and call doBeginReadNow().
295             readPending = true;
296             return;
297         }
298         doBeginReadNow();
299     }
300 
301     private void doBeginReadNow() {
302         if (socket.isBlocking()) {
303             // If the socket is blocking we will directly call scheduleFirstReadIfNeeded() as we can use FASTPOLL.
304             ioUringUnsafe().scheduleFirstReadIfNeeded();
305         } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
306             ioUringUnsafe().schedulePollIn();
307         }
308     }
309 
310     @Override
311     protected void doWrite(ChannelOutboundBuffer in) {
312         if ((ioState & WRITE_SCHEDULED) != 0) {
313             return;
314         }
315         if (scheduleWrite(in) > 0) {
316             ioState |= WRITE_SCHEDULED;
317         }
318     }
319 
320     private int scheduleWrite(ChannelOutboundBuffer in) {
321         if (numOutstandingWrites == Short.MAX_VALUE) {
322             return 0;
323         }
324         if (in == null) {
325             return 0;
326         }
327 
328         int msgCount = in.size();
329         if (msgCount == 0) {
330             return 0;
331         }
332         Object msg = in.current();
333 
334         if (msgCount > 1 && in.current() instanceof ByteBuf) {
335             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
336         } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
337                     (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
338             // We also need some special handling for CompositeByteBuf
339             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
340         } else {
341             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
342         }
343         // Ensure we never overflow
344         assert numOutstandingWrites > 0;
345         return numOutstandingWrites;
346     }
347 
348     protected final IoUringIoRegistration registration() {
349         assert registration != null;
350         return registration;
351     }
352 
353     private void schedulePollOut() {
354         // This should only be done if the socket is non-blocking.
355         assert !socket.isBlocking();
356         pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT);
357     }
358 
359     final void schedulePollRdHup() {
360         pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP);
361     }
362 
363     private long schedulePollAdd(int ioMask, int mask) {
364         assert (ioState & ioMask) == 0;
365         int fd = fd().intValue();
366         IoUringIoRegistration registration = registration();
367         IoUringIoOps ops = IoUringIoOps.newPollAdd(
368                 fd, (byte) 0, mask, (short) mask);
369         long id = registration.submit(ops);
370         if (id != 0) {
371             ioState |= ioMask;
372         }
373         return id;
374     }
375 
376     final void resetCachedAddresses() {
377         local = socket.localAddress();
378         remote = socket.remoteAddress();
379     }
380 
381     abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
382         private IoUringRecvByteAllocatorHandle allocHandle;
383 
384         /**
385          * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer} and returns the number of
386          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
387          */
388         protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
389 
390         /**
391          * Schedule the write of a single message and returns the number of
392          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
393          */
394         protected abstract int scheduleWriteSingle(Object msg);
395 
396         private boolean closed;
397         private boolean freed;
398 
399         @Override
400         public final void handle(IoRegistration registration, IoEvent ioEvent) {
401             IoUringIoRegistration reg = (IoUringIoRegistration) registration;
402             IoUringIoEvent event = (IoUringIoEvent) ioEvent;
403             byte op = event.opcode();
404             int res = event.res();
405             int flags = event.flags();
406             short data = event.data();
407             switch (op) {
408                 case Native.IORING_OP_RECV:
409                 case Native.IORING_OP_ACCEPT:
410                 case Native.IORING_OP_RECVMSG:
411                 case Native.IORING_OP_READ:
412                     readComplete(op, res, flags, data);
413                     break;
414                 case Native.IORING_OP_WRITEV:
415                 case Native.IORING_OP_SEND:
416                 case Native.IORING_OP_SENDMSG:
417                 case Native.IORING_OP_WRITE:
418                 case Native.IORING_OP_SPLICE:
419                     writeComplete(op, res, flags, data);
420                     break;
421                 case Native.IORING_OP_POLL_ADD:
422                     pollAddComplete(res, flags, data);
423                     break;
424                 case Native.IORING_OP_POLL_REMOVE:
425                     // fd reuse can replace a closed channel (with pending POLL_REMOVE CQEs)
426                     // with a new one: better not making it to mess-up with its state!
427                     if (res == 0) {
428                         clearPollFlag(data);
429                     }
430                     break;
431                 case Native.IORING_OP_ASYNC_CANCEL:
432                     cancelComplete0(op, res, flags, data);
433                     break;
434                 case Native.IORING_OP_CONNECT:
435                     connectComplete(op, res, flags, data);
436 
437                     // once the connect was completed we can also free some resources that are not needed anymore.
438                     freeMsgHdrArray();
439                     freeRemoteAddressMemory();
440                     break;
441                 case Native.IORING_OP_CLOSE:
442                     if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
443                         closed = true;
444                     }
445                     break;
446             }
447 
448             if (ioState == 0 && closed) {
449                 freeResourcesNowIfNeeded(reg);
450             }
451         }
452 
453         private void freeResourcesNowIfNeeded(IoUringIoRegistration reg) {
454             if (!freed) {
455                 freed = true;
456                 freeResourcesNow(reg);
457             }
458         }
459 
460         /**
461          * Free all resources now. No new IO will be submitted for this channel via io_uring
462          *
463          * @param reg   the {@link IoUringIoRegistration} or {@code null} if it was never registered
464          */
465         protected void freeResourcesNow(IoUringIoRegistration reg) {
466             freeMsgHdrArray();
467             freeRemoteAddressMemory();
468             if (reg != null) {
469                 reg.cancel();
470             }
471         }
472 
473         private void pollAddComplete(int res, int flags, short data) {
474             if ((data & Native.POLLOUT) != 0) {
475                 pollOut(res);
476             }
477             if ((data & Native.POLLIN) != 0) {
478                 pollIn(res);
479             }
480             if ((data & Native.POLLRDHUP) != 0) {
481                 pollRdHup(res);
482             }
483         }
484 
485         @Override
486         public final void close() throws Exception {
487             close(voidPromise());
488         }
489 
490         private void cancelOps(boolean cancelConnect) {
491             if (registration == null || !registration.isValid()) {
492                 return;
493             }
494             int fd = fd().intValue();
495             if ((ioState & POLL_RDHUP_SCHEDULED) != 0) {
496                 registration.submit(IoUringIoOps.newPollRemove(
497                         fd, (byte) 0, pollRdhupId, (short) Native.POLLRDHUP));
498             }
499             if ((ioState & POLL_IN_SCHEDULED) != 0) {
500                 registration.submit(IoUringIoOps.newPollRemove(
501                         fd, (byte) 0, pollInId, (short) Native.POLLIN));
502             }
503             if ((ioState & POLL_OUT_SCHEDULED) != 0) {
504                 registration.submit(IoUringIoOps.newPollRemove(
505                         fd,  (byte) 0, pollOutId, (short) Native.POLLOUT));
506             }
507             if (cancelConnect && connectId != 0) {
508                 // Best effort to cancel the already submitted connect request.
509                 registration.submit(IoUringIoOps.newAsyncCancel(
510                         fd, (byte) 0, connectId, Native.IORING_OP_CONNECT));
511             }
512             cancelOutstandingReads(registration, numOutstandingReads);
513             cancelOutstandingWrites(registration, numOutstandingWrites);
514         }
515 
516         @Override
517         protected final void flush0() {
518             // Flush immediately only when there's no pending flush.
519             // If there's a pending flush operation, event loop will call forceFlush() later,
520             // and thus there's no need to call it now.
521             if ((ioState & POLL_OUT_SCHEDULED) == 0) {
522                 super.flush0();
523             }
524         }
525 
526         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
527             if (promise == null) {
528                 // Closed via cancellation and the promise has been notified already.
529                 return;
530             }
531 
532             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
533             promise.tryFailure(cause);
534             closeIfClosed();
535         }
536 
537         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
538             if (promise == null) {
539                 // Closed via cancellation and the promise has been notified already.
540                 return;
541             }
542             active = true;
543 
544             if (local == null) {
545                 local = socket.localAddress();
546             }
547             computeRemote();
548 
549             // Register POLLRDHUP
550             schedulePollRdHup();
551 
552             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
553             // We still need to ensure we call fireChannelActive() in this case.
554             boolean active = isActive();
555 
556             // trySuccess() will return false if a user cancelled the connection attempt.
557             boolean promiseSet = promise.trySuccess();
558 
559             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
560             // because what happened is what happened.
561             if (!wasActive && active) {
562                 pipeline().fireChannelActive();
563             }
564 
565             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
566             if (!promiseSet) {
567                 close(voidPromise());
568             }
569         }
570 
571         @Override
572         public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
573             if (allocHandle == null) {
574                 allocHandle = new IoUringRecvByteAllocatorHandle(
575                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
576             }
577             return allocHandle;
578         }
579 
580         final void shutdownInput(boolean allDataRead) {
581             logger.trace("shutdownInput Fd: {}", fd().intValue());
582             if (!socket.isInputShutdown()) {
583                 if (isAllowHalfClosure(config())) {
584                     try {
585                         socket.shutdown(true, false);
586                     } catch (IOException ignored) {
587                         // We attempted to shutdown and failed, which means the input has already effectively been
588                         // shutdown.
589                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
590                         return;
591                     } catch (NotYetConnectedException ignore) {
592                         // We attempted to shutdown and failed, which means the input has already effectively been
593                         // shutdown.
594                     }
595                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
596                 } else {
597                     close(voidPromise());
598                     return;
599                 }
600             }
601             if (allDataRead && !inputClosedSeenErrorOnRead) {
602                 inputClosedSeenErrorOnRead = true;
603                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
604             }
605         }
606 
607         private void fireEventAndClose(Object evt) {
608             pipeline().fireUserEventTriggered(evt);
609             close(voidPromise());
610         }
611 
612         final void schedulePollIn() {
613             // This should only be done if the socket is non-blocking.
614             assert !socket.isBlocking();
615 
616             assert (ioState & POLL_IN_SCHEDULED) == 0;
617             if (!isActive() || shouldBreakIoUringInReady(config())) {
618                 return;
619             }
620             pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN);
621         }
622 
623         private void readComplete(byte op, int res, int flags, short data) {
624             assert numOutstandingReads > 0;
625             if (--numOutstandingReads == 0) {
626                 readPending = false;
627                 ioState &= ~READ_SCHEDULED;
628             }
629             inReadComplete = true;
630             try {
631                 readComplete0(op, res, flags, data, numOutstandingReads);
632             } finally {
633                 inReadComplete = false;
634                 // There is a pending read and readComplete0(...) also did stop issue read request.
635                 // Let's trigger the requested read now.
636                 if (readPending && recvBufAllocHandle().isReadComplete()) {
637                     doBeginReadNow();
638                 }
639             }
640         }
641 
642         /**
643          * Called once a read was completed.
644          */
645         protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
646 
647         /**
648          * Called once POLLRDHUP event is ready to be processed
649          */
650         private void pollRdHup(int res) {
651             ioState &= ~POLL_RDHUP_SCHEDULED;
652             pollRdhupId = 0;
653             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
654                 return;
655             }
656 
657             // Mark that we received a POLLRDHUP and so need to continue reading until all the input ist drained.
658             recvBufAllocHandle().rdHupReceived();
659 
660             if (isActive()) {
661                 scheduleFirstReadIfNeeded();
662             } else {
663                 // Just to be safe make sure the input marked as closed.
664                 shutdownInput(false);
665             }
666         }
667 
668         /**
669          * Called once POLLIN event is ready to be processed
670          */
671         private void pollIn(int res) {
672             ioState &= ~POLL_IN_SCHEDULED;
673             pollInId = 0;
674             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
675                 return;
676             }
677 
678             scheduleFirstReadIfNeeded();
679         }
680 
681         private void scheduleFirstReadIfNeeded() {
682             if ((ioState & READ_SCHEDULED) == 0) {
683                 scheduleFirstRead();
684             }
685         }
686 
687         private void scheduleFirstRead() {
688             // This is a new "read loop" so we need to reset the allocHandle.
689             final ChannelConfig config = config();
690             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
691             allocHandle.reset(config);
692             scheduleRead(true);
693         }
694 
695         protected final void scheduleRead(boolean first) {
696             // Only schedule another read if the fd is still open.
697             if (fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
698                 numOutstandingReads = (short) scheduleRead0(first);
699                 if (numOutstandingReads > 0) {
700                     ioState |= READ_SCHEDULED;
701                 }
702             }
703         }
704 
705         /**
706          * Schedule a read and returns the number of {@link #readComplete(byte, int, int, short)}
707          * calls that are expected because of the scheduled read.
708          *
709          * @param first {@code true} if this is the first read of a read loop.
710          */
711         protected abstract int scheduleRead0(boolean first);
712 
713         /**
714          * Called once POLLOUT event is ready to be processed
715          *
716          * @param res   the result.
717          */
718         private void pollOut(int res) {
719             ioState &= ~POLL_OUT_SCHEDULED;
720             pollOutId = 0;
721             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
722                 return;
723             }
724             // pending connect
725             if (connectPromise != null) {
726                 // Note this method is invoked by the event loop only if the connection attempt was
727                 // neither cancelled nor timed out.
728 
729                 assert eventLoop().inEventLoop();
730 
731                 boolean connectStillInProgress = false;
732                 try {
733                     boolean wasActive = isActive();
734                     if (!socket.finishConnect()) {
735                         connectStillInProgress = true;
736                         return;
737                     }
738                     fulfillConnectPromise(connectPromise, wasActive);
739                 } catch (Throwable t) {
740                     fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
741                 } finally {
742                     if (!connectStillInProgress) {
743                         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0
744                         // is used
745                         // See https://github.com/netty/netty/issues/1770
746                         cancelConnectTimeoutFuture();
747                         connectPromise = null;
748                     } else {
749                         // This should only happen if the socket is non-blocking.
750                         assert !socket.isBlocking();
751 
752                         // The connect was not done yet, register for POLLOUT again
753                         schedulePollOut();
754                     }
755                 }
756             } else if (!socket.isOutputShutdown()) {
757                 // Try writing again
758                 super.flush0();
759             }
760         }
761 
762         /**
763          * Called once a write was completed.
764          *
765          * @param op    the op code.
766          * @param res   the result.
767          * @param flags the flags.
768          * @param data  the data that was passed when submitting the op.
769          */
770         private void writeComplete(byte op, int res, int flags, short data) {
771             if ((ioState & CONNECT_SCHEDULED) != 0) {
772                 // The writeComplete(...) callback was called because of a sendmsg(...) result that was used for
773                 // TCP_FASTOPEN_CONNECT.
774                 freeMsgHdrArray();
775                 if (res > 0) {
776                     // Connect complete!
777                     outboundBuffer().removeBytes(res);
778 
779                     // Explicit pass in 0 as this is returned by a connect(...) call when it was successful.
780                     connectComplete(op, 0, flags, data);
781                 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
782                     // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
783                     // In this case, our TCP connection will be established normally, but no data was transmitted at
784                     // this time. We'll just transmit the data with normal writes later.
785                     // Let's submit a normal connect.
786                     submitConnect((InetSocketAddress) requestedRemoteAddress);
787                 } else {
788                     // There was an error, handle it as a normal connect error.
789                     connectComplete(op, res, flags, data);
790                 }
791                 return;
792             }
793             assert numOutstandingWrites > 0;
794             --numOutstandingWrites;
795 
796             boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
797             if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
798                 // This should only happen if the socket is non-blocking.
799                 assert !socket.isBlocking();
800 
801                 // We were not able to write everything, let's register for POLLOUT
802                 schedulePollOut();
803             }
804 
805             // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
806             // while still removing messages internally in removeBytes(...) which then may corrupt state.
807             if (numOutstandingWrites == 0) {
808                 ioState &= ~WRITE_SCHEDULED;
809 
810                 // If we could write all and we did not schedule a pollout yet let us try to write again
811                 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
812                     doWrite(unsafe().outboundBuffer());
813                 }
814             }
815         }
816 
817         /**
818          * Called once a write was completed.
819          * @param op            the op code
820          * @param res           the result.
821          * @param flags         the flags.
822          * @param data          the data that was passed when submitting the op.
823          * @param outstanding   the outstanding write completions.
824          */
825         abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
826 
827         /**
828          * Called once a cancel was completed.
829          *
830          * @param op            the op code
831          * @param res           the result.
832          * @param flags         the flags.
833          * @param data          the data that was passed when submitting the op.
834          */
835         void cancelComplete0(byte op, int res, int flags, short data) {
836             // NOOP
837         }
838 
839         /**
840          * Called once a connect was completed.
841          * @param op            the op code.
842          * @param res           the result.
843          * @param flags         the flags.
844          * @param data          the data that was passed when submitting the op.
845          */
846         void connectComplete(byte op, int res, int flags, short data) {
847             ioState &= ~CONNECT_SCHEDULED;
848             freeRemoteAddressMemory();
849 
850             if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
851                 // This should only happen if the socket is non-blocking.
852                 assert !socket.isBlocking();
853 
854                 // connect not complete yet need to wait for poll_out event
855                 schedulePollOut();
856             } else {
857                 try {
858                     if (res == 0) {
859                         fulfillConnectPromise(connectPromise, active);
860                         if (readPending) {
861                             doBeginReadNow();
862                         }
863                     } else {
864                         try {
865                             Errors.throwConnectException("io_uring connect", res);
866                         } catch (Throwable cause) {
867                             fulfillConnectPromise(connectPromise, cause);
868                         }
869                     }
870                 } finally {
871                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is
872                     // used
873                     // See https://github.com/netty/netty/issues/1770
874                     cancelConnectTimeoutFuture();
875                     connectPromise = null;
876                 }
877             }
878         }
879 
880         @Override
881         public void connect(
882                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
883             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
884             // non-blocking io.
885             if (promise.isDone() || !ensureOpen(promise)) {
886                 return;
887             }
888 
889             if (!isOpen()) {
890                 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
891                 return;
892             }
893             try {
894                 if (connectPromise != null) {
895                     throw new ConnectionPendingException();
896                 }
897                 if (localAddress instanceof InetSocketAddress) {
898                     checkResolvable((InetSocketAddress) localAddress);
899                 }
900 
901                 if (remoteAddress instanceof InetSocketAddress) {
902                     checkResolvable((InetSocketAddress) remoteAddress);
903                 }
904 
905                 if (remote != null) {
906                     // Check if already connected before trying to connect. This is needed as connect(...) will not#
907                     // return -1 and set errno to EISCONN if a previous connect(...) attempt was setting errno to
908                     // EINPROGRESS and finished later.
909                     throw new AlreadyConnectedException();
910                 }
911 
912                 if (localAddress != null) {
913                     socket.bind(localAddress);
914                 }
915 
916                 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
917 
918                 ByteBuf initialData = null;
919                 if (IoUring.isTcpFastOpenClientSideAvailable() &&
920                         config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
921                     ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
922                     outbound.addFlush();
923                     Object curr;
924                     if ((curr = outbound.current()) instanceof ByteBuf) {
925                         initialData = (ByteBuf) curr;
926                     }
927                 }
928                 if (initialData != null) {
929                     msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
930                     MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
931                     hdr.write(socket, inetSocketAddress, initialData.memoryAddress(),
932                             initialData.readableBytes(), (short) 0);
933 
934                     int fd = fd().intValue();
935                     IoUringIoRegistration registration = registration();
936                     IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
937                             hdr.address(), hdr.idx());
938                     connectId = registration.submit(ops);
939                     if (connectId == 0) {
940                         // Directly release the memory if submitting failed.
941                         freeMsgHdrArray();
942                     }
943                 } else {
944                     submitConnect(inetSocketAddress);
945                 }
946                 if (connectId != 0) {
947                     ioState |= CONNECT_SCHEDULED;
948                 }
949             } catch (Throwable t) {
950                 closeIfClosed();
951                 promise.tryFailure(annotateConnectException(t, remoteAddress));
952                 return;
953             }
954             connectPromise = promise;
955             requestedRemoteAddress = remoteAddress;
956             // Schedule connect timeout.
957             int connectTimeoutMillis = config().getConnectTimeoutMillis();
958             if (connectTimeoutMillis > 0) {
959                 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
960                     @Override
961                     public void run() {
962                         ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
963                         if (connectPromise != null && !connectPromise.isDone() &&
964                                 connectPromise.tryFailure(new ConnectTimeoutException(
965                                         "connection timed out: " + remoteAddress))) {
966                             close(voidPromise());
967                         }
968                     }
969                 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
970             }
971 
972             promise.addListener(new ChannelFutureListener() {
973                 @Override
974                 public void operationComplete(ChannelFuture future) {
975                     // If the connect future is cancelled we also cancel the timeout and close the
976                     // underlying socket.
977                     if (future.isCancelled()) {
978                         cancelConnectTimeoutFuture();
979                         connectPromise = null;
980                         close(voidPromise());
981                     }
982                 }
983             });
984         }
985     }
986 
987     private void submitConnect(InetSocketAddress inetSocketAddress) {
988         remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
989         long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
990 
991         SockaddrIn.write(socket.isIpv6(), remoteAddressMemoryAddress, inetSocketAddress);
992 
993         int fd = fd().intValue();
994         IoUringIoRegistration registration = registration();
995         IoUringIoOps ops = IoUringIoOps.newConnect(
996                 fd, (byte) 0, remoteAddressMemoryAddress, nextOpsId());
997         connectId = registration.submit(ops);
998         if (connectId == 0) {
999             // Directly release the memory if submitting failed.
1000             freeRemoteAddressMemory();
1001         }
1002     }
1003 
1004     @Override
1005     protected Object filterOutboundMessage(Object msg) {
1006         if (msg instanceof ByteBuf) {
1007             ByteBuf buf = (ByteBuf) msg;
1008             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1009         }
1010         throw new UnsupportedOperationException("unsupported message type");
1011     }
1012 
1013     @Override
1014     protected void doRegister(ChannelPromise promise) {
1015         IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1016         eventLoop.register(ioUringUnsafe()).addListener(f -> {
1017             if (f.isSuccess()) {
1018                 registration = (IoUringIoRegistration) f.getNow();
1019                 promise.setSuccess();
1020             } else {
1021                 promise.setFailure(f.cause());
1022             }
1023         });
1024     }
1025 
1026     @Override
1027     protected final void doDeregister() {
1028         // Cancel all previous submitted ops.
1029         ioUringUnsafe().cancelOps(connectPromise != null);
1030     }
1031 
1032     @Override
1033     protected void doBind(final SocketAddress local) throws Exception {
1034         if (local instanceof InetSocketAddress) {
1035             checkResolvable((InetSocketAddress) local);
1036         }
1037         socket.bind(local);
1038         this.local = socket.localAddress();
1039     }
1040 
1041     protected static void checkResolvable(InetSocketAddress addr) {
1042         if (addr.isUnresolved()) {
1043             throw new UnresolvedAddressException();
1044         }
1045     }
1046 
1047     @Override
1048     protected final SocketAddress localAddress0() {
1049         return local;
1050     }
1051 
1052     @Override
1053     protected final SocketAddress remoteAddress0() {
1054         return remote;
1055     }
1056 
1057     private static boolean isAllowHalfClosure(ChannelConfig config) {
1058         return config instanceof SocketChannelConfig &&
1059                ((SocketChannelConfig) config).isAllowHalfClosure();
1060     }
1061 
1062     private void cancelConnectTimeoutFuture() {
1063         if (connectTimeoutFuture != null) {
1064             connectTimeoutFuture.cancel(false);
1065             connectTimeoutFuture = null;
1066         }
1067     }
1068 
1069     private void computeRemote() {
1070         if (requestedRemoteAddress instanceof InetSocketAddress) {
1071             remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1072         }
1073     }
1074 
1075     private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1076         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1077     }
1078 
1079     private void clearPollFlag(int pollMask) {
1080         if ((pollMask & Native.POLLIN) != 0) {
1081             ioState &= ~POLL_IN_SCHEDULED;
1082             pollInId = 0;
1083         }
1084         if ((pollMask & Native.POLLOUT) != 0) {
1085             ioState &= ~POLL_OUT_SCHEDULED;
1086             pollOutId = 0;
1087         }
1088         if ((pollMask & Native.POLLRDHUP) != 0) {
1089             ioState &= ~POLL_RDHUP_SCHEDULED;
1090             pollRdhupId = 0;
1091         }
1092      }
1093 }