View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufHolder;
21  import io.netty.buffer.ByteBufUtil;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractChannel;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelOption;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.ServerChannel;
38  import io.netty.channel.socket.ChannelInputShutdownEvent;
39  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40  import io.netty.channel.socket.SocketChannelConfig;
41  import io.netty.channel.unix.Buffer;
42  import io.netty.channel.unix.Errors;
43  import io.netty.channel.unix.FileDescriptor;
44  import io.netty.channel.unix.UnixChannel;
45  import io.netty.channel.unix.UnixChannelUtil;
46  import io.netty.util.ReferenceCountUtil;
47  import io.netty.util.concurrent.PromiseNotifier;
48  import io.netty.util.internal.logging.InternalLogger;
49  import io.netty.util.internal.logging.InternalLoggerFactory;
50  
51  import java.io.IOException;
52  import java.net.InetSocketAddress;
53  import java.net.SocketAddress;
54  import java.nio.ByteBuffer;
55  import java.nio.channels.AlreadyConnectedException;
56  import java.nio.channels.ClosedChannelException;
57  import java.nio.channels.ConnectionPendingException;
58  import java.nio.channels.NotYetConnectedException;
59  import java.nio.channels.UnresolvedAddressException;
60  import java.util.concurrent.ScheduledFuture;
61  import java.util.concurrent.TimeUnit;
62  
63  import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
64  import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
65  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
66  import static io.netty.util.internal.ObjectUtil.checkNotNull;
67  
68  
69  abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
70      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
71      final LinuxSocket socket;
72      protected volatile boolean active;
73  
74      // Different masks for outstanding I/O operations.
75      private static final int POLL_IN_SCHEDULED = 1;
76      private static final int POLL_OUT_SCHEDULED = 1 << 2;
77      private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
78      private static final int WRITE_SCHEDULED = 1 << 4;
79      private static final int READ_SCHEDULED = 1 << 5;
80      private static final int CONNECT_SCHEDULED = 1 << 6;
81  
82      private short opsId = Short.MIN_VALUE;
83  
84      private long pollInId;
85      private long pollOutId;
86      private long pollRdhupId;
87      private long connectId;
88  
89      // A byte is enough for now.
90      private byte ioState;
91  
92      // It's possible that multiple read / writes are issued. We need to keep track of these.
93      // Let's limit the amount of pending writes and reads by Short.MAX_VALUE. Maybe Byte.MAX_VALUE would also be good
94      // enough but let's be a bit more flexible for now.
95      private short numOutstandingWrites;
96      private short numOutstandingReads;
97  
98      private boolean readPending;
99      private boolean inReadComplete;
100 
101     private ChannelPromise delayedClose;
102     private boolean inputClosedSeenErrorOnRead;
103 
104     /**
105      * The future of the current connection attempt.  If not null, subsequent connection attempts will fail.
106      */
107     private ChannelPromise connectPromise;
108     private ScheduledFuture<?> connectTimeoutFuture;
109     private SocketAddress requestedRemoteAddress;
110     private ByteBuffer remoteAddressMemory;
111     private MsgHdrMemoryArray msgHdrMemoryArray;
112 
113     private IoUringIoRegistration registration;
114 
115     private volatile SocketAddress local;
116     private volatile SocketAddress remote;
117 
118     AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
119         super(parent);
120         this.socket = checkNotNull(socket, "fd");
121 
122         if (active) {
123             // Directly cache the remote and local addresses
124             // See https://github.com/netty/netty/issues/2359
125             this.active = true;
126             this.local = socket.localAddress();
127             this.remote = socket.remoteAddress();
128         }
129 
130         logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
131     }
132 
133     AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
134         super(parent);
135         this.socket = checkNotNull(fd, "fd");
136         this.active = true;
137 
138         // Directly cache the remote and local addresses
139         // See https://github.com/netty/netty/issues/2359
140         this.remote = remote;
141         this.local = fd.localAddress();
142     }
143 
144     /**
145      * Returns the next id that should be used when submitting {@link IoUringIoOps}.
146      *
147      * @return  opsId
148      */
149     protected final short nextOpsId() {
150         short id = opsId++;
151 
152         // We use 0 for "none".
153         if (id == 0) {
154             id = opsId++;
155         }
156         return id;
157     }
158 
159     public final boolean isOpen() {
160         return socket.isOpen();
161     }
162 
163     @Override
164     public boolean isActive() {
165         return active;
166     }
167 
168     @Override
169     public final FileDescriptor fd() {
170         return socket;
171     }
172 
173     private AbstractUringUnsafe ioUringUnsafe() {
174         return (AbstractUringUnsafe) unsafe();
175     }
176 
177     @Override
178     protected boolean isCompatible(final EventLoop loop) {
179         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
180     }
181 
182     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
183         return newDirectBuffer(buf, buf);
184     }
185 
186     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
187         final int readableBytes = buf.readableBytes();
188         if (readableBytes == 0) {
189             ReferenceCountUtil.release(holder);
190             return Unpooled.EMPTY_BUFFER;
191         }
192 
193         final ByteBufAllocator alloc = alloc();
194         if (alloc.isDirectBufferPooled()) {
195             return newDirectBuffer0(holder, buf, alloc, readableBytes);
196         }
197 
198         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
199         if (directBuf == null) {
200             return newDirectBuffer0(holder, buf, alloc, readableBytes);
201         }
202 
203         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
204         ReferenceCountUtil.safeRelease(holder);
205         return directBuf;
206     }
207 
208     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
209         final ByteBuf directBuf = alloc.directBuffer(capacity);
210         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
211         ReferenceCountUtil.safeRelease(holder);
212         return directBuf;
213     }
214 
215     /**
216      * Cancel all outstanding reads
217      *
218      * @param registration          the {@link IoUringIoRegistration}.
219      * @param numOutstandingReads   the number of outstanding reads.
220      */
221     protected abstract void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads);
222 
223     /**
224      * Cancel all outstanding writes
225      *
226      * @param registration          the {@link IoUringIoRegistration}.
227      * @param numOutstandingWrites   the number of outstanding writes.
228      */
229     protected abstract void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites);
230 
231     @Override
232     protected void doDisconnect() throws Exception {
233     }
234 
235     private void freeRemoteAddressMemory() {
236         if (remoteAddressMemory != null) {
237             Buffer.free(remoteAddressMemory);
238             remoteAddressMemory = null;
239         }
240     }
241 
242     private void freeMsgHdrArray() {
243         if (msgHdrMemoryArray != null) {
244             msgHdrMemoryArray.release();
245             msgHdrMemoryArray = null;
246         }
247     }
248 
249     @Override
250     protected void doClose() throws Exception {
251         active = false;
252 
253         if (registration != null) {
254             if (socket.markClosed()) {
255                 int fd = fd().intValue();
256                 IoUringIoOps ops = IoUringIoOps.newClose(fd, 0, nextOpsId());
257                 registration.submit(ops);
258             }
259         } else {
260             // This one was never registered just use a syscall to close.
261             socket.close();
262             freeRemoteAddressMemory();
263             freeMsgHdrArray();
264         }
265     }
266 
267     @Override
268     protected final void doBeginRead() {
269         if (inputClosedSeenErrorOnRead) {
270             // We did see an error while reading and so closed the input. Stop reading.
271             return;
272         }
273         if (readPending) {
274             // We already have a read pending.
275             return;
276         }
277         if (inReadComplete || !isActive()) {
278             // We are currently in the readComplete(...) callback which might issue more reads by itself. Just
279             // mark it as a pending read. If readComplete(...) will not issue more reads itself it will pick up
280             // the readPending flag, reset it and call doBeginReadNow().
281             readPending = true;
282             return;
283         }
284         doBeginReadNow();
285     }
286 
287     private void doBeginReadNow() {
288         if (socket.isBlocking()) {
289             // If the socket is blocking we will directly call scheduleFirstReadIfNeeded() as we can use FASTPOLL.
290             ioUringUnsafe().scheduleFirstReadIfNeeded();
291         } else {
292             if ((ioState & POLL_IN_SCHEDULED) == 0) {
293                 ioUringUnsafe().schedulePollIn();
294             }
295         }
296     }
297 
298     @Override
299     protected void doWrite(ChannelOutboundBuffer in) {
300         if ((ioState & WRITE_SCHEDULED) != 0) {
301             return;
302         }
303         if (scheduleWrite(in) > 0) {
304             ioState |= WRITE_SCHEDULED;
305         }
306     }
307 
308     private int scheduleWrite(ChannelOutboundBuffer in) {
309         if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
310             return 0;
311         }
312         if (in == null) {
313             return 0;
314         }
315 
316         int msgCount = in.size();
317         if (msgCount == 0) {
318             return 0;
319         }
320         Object msg = in.current();
321 
322         if (msgCount > 1) {
323             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
324         } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
325                     (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
326             // We also need some special handling for CompositeByteBuf
327             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
328         } else {
329             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
330         }
331         // Ensure we never overflow
332         assert numOutstandingWrites > 0;
333         return numOutstandingWrites;
334     }
335 
336     protected final IoUringIoRegistration registration() {
337         assert registration != null;
338         return registration;
339     }
340 
341     private void schedulePollOut() {
342         // This should only be done if the socket is non-blocking.
343         assert !socket.isBlocking();
344 
345         assert (ioState & POLL_OUT_SCHEDULED) == 0;
346         int fd = fd().intValue();
347         IoUringIoRegistration registration = registration();
348         IoUringIoOps ops = IoUringIoOps.newPollAdd(
349                 fd, 0, Native.POLLOUT, (short) Native.POLLOUT);
350         pollOutId = registration.submit(ops);
351         ioState |= POLL_OUT_SCHEDULED;
352     }
353 
354     final void schedulePollRdHup() {
355         assert (ioState & POLL_RDHUP_SCHEDULED) == 0;
356         int fd = fd().intValue();
357         IoUringIoRegistration registration = registration();
358         IoUringIoOps ops = IoUringIoOps.newPollAdd(
359                 fd, 0, Native.POLLRDHUP, (short) Native.POLLRDHUP);
360         pollRdhupId = registration.submit(ops);
361         ioState |= POLL_RDHUP_SCHEDULED;
362     }
363 
364     final void resetCachedAddresses() {
365         local = socket.localAddress();
366         remote = socket.remoteAddress();
367     }
368 
369     abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
370         private IoUringRecvByteAllocatorHandle allocHandle;
371 
372         /**
373          * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer} and returns the number of
374          * {@link #writeComplete(int, int, short)} calls that are expected because of the scheduled write.
375          */
376         protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
377 
378         /**
379          * Schedule the write of a single message and returns the number of {@link #writeComplete(int, int, short)}
380          * calls that are expected because of the scheduled write
381          */
382         protected abstract int scheduleWriteSingle(Object msg);
383 
384         private boolean closed;
385         private boolean freed;
386 
387         @Override
388         public final void handle(IoRegistration registration, IoEvent ioEvent) {
389             IoUringIoRegistration reg = (IoUringIoRegistration) registration;
390             IoUringIoEvent event = (IoUringIoEvent) ioEvent;
391             byte op = event.opcode();
392             int res = event.res();
393             int flags = event.flags();
394             short data = event.data();
395             switch (op) {
396                 case Native.IORING_OP_RECV:
397                 case Native.IORING_OP_ACCEPT:
398                 case Native.IORING_OP_RECVMSG:
399                 case Native.IORING_OP_READ:
400                     readComplete(res, flags, data);
401 
402                     // We delay the actual close if there is still a write or read scheduled, let's see if there
403                     // was a close that needs to be done now.
404                     handleDelayedClosed();
405                     break;
406                 case Native.IORING_OP_WRITEV:
407                 case Native.IORING_OP_SEND:
408                 case Native.IORING_OP_SENDMSG:
409                 case Native.IORING_OP_WRITE:
410                     writeComplete(res, flags, data);
411 
412                     // We delay the actual close if there is still a write or read scheduled, let's see if there
413                     // was a close that needs to be done now.
414                     handleDelayedClosed();
415                     break;
416                 case Native.IORING_OP_POLL_ADD:
417                     pollAddComplete(res, flags, data);
418                     break;
419                 case Native.IORING_OP_POLL_REMOVE:
420                     // fd reuse can replace a closed channel (with pending POLL_REMOVE CQEs)
421                     // with a new one: better not making it to mess-up with its state!
422                     if (res == 0) {
423                         clearPollFlag(data);
424                     }
425                     break;
426                 case Native.IORING_OP_ASYNC_CANCEL:
427                     cancelComplete0(res, flags, data);
428                     break;
429                 case Native.IORING_OP_CONNECT:
430                     connectComplete(res, flags, data);
431 
432                     // once the connect was completed we can also free some resources that are not needed anymore.
433                     freeMsgHdrArray();
434                     freeRemoteAddressMemory();
435                     break;
436                 case Native.IORING_OP_CLOSE:
437                     if (delayedClose != null) {
438                         delayedClose.setSuccess();
439                     }
440                     closed = true;
441                     break;
442             }
443 
444             if (ioState == 0 && closed && !freed) {
445                 freed = true;
446                 freeResourcesNow(reg);
447             }
448         }
449 
450         /**
451          * Free all resources now. No new IO will be submitted for this channel via io_uring
452          *
453          * @param reg   the {@link IoUringIoRegistration}.
454          */
455         protected void freeResourcesNow(IoUringIoRegistration reg) {
456             freeMsgHdrArray();
457             freeRemoteAddressMemory();
458             reg.cancel();
459         }
460 
461         private void handleDelayedClosed() {
462             if (delayedClose != null && canCloseNow()) {
463                 closeNow(newPromise());
464             }
465         }
466 
467         private void pollAddComplete(int res, int flags, short data) {
468             if ((data & Native.POLLOUT) != 0) {
469                 pollOut(res);
470             }
471             if ((data & Native.POLLIN) != 0) {
472                 pollIn(res);
473             }
474             if ((data & Native.POLLRDHUP) != 0) {
475                 pollRdHup(res);
476             }
477         }
478 
479         @Override
480         public final void close() throws Exception {
481             close(voidPromise());
482         }
483 
484         @Override
485         public final void close(ChannelPromise promise) {
486             if (delayedClose == null) {
487                 // We have a write operation pending that should be completed asap.
488                 // We will do the actual close operation one this write result is returned as otherwise
489                 // we may get into trouble as we may close the fd while we did not process the write yet.
490                 delayedClose = promise.isVoid() ? newPromise() : promise;
491             } else {
492                 delayedClose.addListener(new PromiseNotifier<>(false, promise));
493                 return;
494             }
495 
496             boolean cancelConnect = false;
497             try {
498                 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
499                 if (connectPromise != null) {
500                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
501                     connectPromise.tryFailure(new ClosedChannelException());
502                     AbstractIoUringChannel.this.connectPromise = null;
503                     cancelConnect = true;
504                 }
505 
506                 cancelConnectTimeoutFuture();
507             } finally {
508                 // It's important we cancel all outstanding connect, write and read operations now so
509                 // we will be able to process a delayed close if needed.
510                 if (registration != null) {
511                     if (cancelConnect && connectId != 0) {
512                         int fd = fd().intValue();
513                         // Best effort to cancel the already submitted connect request.
514                         registration.submit(IoUringIoOps.newAsyncCancel(
515                                 fd, 0, connectId, Native.IORING_OP_CONNECT));
516                     }
517                     cancelOutstandingReads(registration, numOutstandingReads);
518                     cancelOutstandingWrites(registration, numOutstandingWrites);
519                 }
520             }
521 
522             if (canCloseNow()) {
523                 // Currently there are is no WRITE and READ scheduled so we can start to teardown the channel.
524                 closeNow(newPromise());
525             }
526         }
527 
528         private boolean canCloseNow() {
529             // Currently there are is no WRITE and READ scheduled, we can close the channel now without
530             // problems related to re-ordering of completions.
531             return (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
532         }
533 
534         private void closeNow(ChannelPromise promise) {
535             super.close(promise);
536         }
537 
538         @Override
539         protected final void flush0() {
540             // Flush immediately only when there's no pending flush.
541             // If there's a pending flush operation, event loop will call forceFlush() later,
542             // and thus there's no need to call it now.
543             if ((ioState & POLL_OUT_SCHEDULED) == 0) {
544                 super.flush0();
545             }
546         }
547 
548         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
549             if (promise == null) {
550                 // Closed via cancellation and the promise has been notified already.
551                 return;
552             }
553 
554             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
555             promise.tryFailure(cause);
556             closeIfClosed();
557         }
558 
559         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
560             if (promise == null) {
561                 // Closed via cancellation and the promise has been notified already.
562                 return;
563             }
564             active = true;
565 
566             if (local == null) {
567                 local = socket.localAddress();
568             }
569             computeRemote();
570 
571             // Register POLLRDHUP
572             schedulePollRdHup();
573 
574             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
575             // We still need to ensure we call fireChannelActive() in this case.
576             boolean active = isActive();
577 
578             // trySuccess() will return false if a user cancelled the connection attempt.
579             boolean promiseSet = promise.trySuccess();
580 
581             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
582             // because what happened is what happened.
583             if (!wasActive && active) {
584                 pipeline().fireChannelActive();
585             }
586 
587             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
588             if (!promiseSet) {
589                 close(voidPromise());
590             }
591         }
592 
593         @Override
594         public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
595             if (allocHandle == null) {
596                 allocHandle = new IoUringRecvByteAllocatorHandle(
597                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
598             }
599             return allocHandle;
600         }
601 
602         final void shutdownInput(boolean allDataRead) {
603             logger.trace("shutdownInput Fd: {}", fd().intValue());
604             if (!socket.isInputShutdown()) {
605                 if (isAllowHalfClosure(config())) {
606                     try {
607                         socket.shutdown(true, false);
608                     } catch (IOException ignored) {
609                         // We attempted to shutdown and failed, which means the input has already effectively been
610                         // shutdown.
611                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
612                         return;
613                     } catch (NotYetConnectedException ignore) {
614                         // We attempted to shutdown and failed, which means the input has already effectively been
615                         // shutdown.
616                     }
617                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
618                 } else {
619                     close(voidPromise());
620                     return;
621                 }
622             }
623             if (allDataRead && !inputClosedSeenErrorOnRead) {
624                 inputClosedSeenErrorOnRead = true;
625                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
626             }
627         }
628 
629         private void fireEventAndClose(Object evt) {
630             pipeline().fireUserEventTriggered(evt);
631             close(voidPromise());
632         }
633 
634         final void schedulePollIn() {
635             // This should only be done if the socket is non-blocking.
636             assert !socket.isBlocking();
637 
638             assert (ioState & POLL_IN_SCHEDULED) == 0;
639             if (!isActive() || shouldBreakIoUringInReady(config())) {
640                 return;
641             }
642             int fd = fd().intValue();
643             IoUringIoRegistration registration = registration();
644             IoUringIoOps ops = IoUringIoOps.newPollAdd(
645                     fd, 0, Native.POLLIN, (short) Native.POLLIN);
646             pollInId = registration.submit(ops);
647             ioState |= POLL_IN_SCHEDULED;
648         }
649 
650         private void readComplete(int res, int flags, int data) {
651             assert numOutstandingReads > 0;
652             if (--numOutstandingReads == 0) {
653                 readPending = false;
654                 ioState &= ~READ_SCHEDULED;
655             }
656             inReadComplete = true;
657             try {
658                 readComplete0(res, flags, data, numOutstandingReads);
659             } finally {
660                 inReadComplete = false;
661                 // There is a pending read and readComplete0(...) also did stop issue read request.
662                 // Let's trigger the requested read now.
663                 if (readPending && recvBufAllocHandle().isReadComplete()) {
664                     doBeginReadNow();
665                 }
666             }
667         }
668 
669         /**
670          * Called once a read was completed.
671          */
672         protected abstract void readComplete0(int res, int flags, int data, int outstandingCompletes);
673 
674         /**
675          * Called once POLLRDHUP event is ready to be processed
676          */
677         private void pollRdHup(int res) {
678             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
679                 return;
680             }
681             ioState &= ~POLL_RDHUP_SCHEDULED;
682 
683             // Mark that we received a POLLRDHUP and so need to continue reading until all the input ist drained.
684             recvBufAllocHandle().rdHupReceived();
685 
686             if (isActive()) {
687                 scheduleFirstReadIfNeeded();
688             } else {
689                 // Just to be safe make sure the input marked as closed.
690                 shutdownInput(false);
691             }
692         }
693 
694         /**
695          * Called once POLLIN event is ready to be processed
696          */
697         private void pollIn(int res) {
698             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
699                 return;
700             }
701             ioState &= ~POLL_IN_SCHEDULED;
702 
703             scheduleFirstReadIfNeeded();
704         }
705 
706         private void scheduleFirstReadIfNeeded() {
707             if ((ioState & READ_SCHEDULED) == 0) {
708                 scheduleFirstRead();
709             }
710         }
711 
712         private void scheduleFirstRead() {
713             // This is a new "read loop" so we need to reset the allocHandle.
714             final ChannelConfig config = config();
715             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
716             allocHandle.reset(config);
717             scheduleRead(true);
718         }
719 
720         protected final void scheduleRead(boolean first) {
721             // Only schedule another read if the fd is still open.
722             if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
723                 numOutstandingReads = (short) scheduleRead0(first);
724                 if (numOutstandingReads > 0) {
725                     ioState |= READ_SCHEDULED;
726                 }
727             }
728         }
729 
730         /**
731          * Schedule a read and returns the number of {@link #readComplete(int, int, int)} calls that are expected
732          * because of the scheduled read.
733          *
734          * @param first {@code true} if this is the first read of a read loop.
735          */
736         protected abstract int scheduleRead0(boolean first);
737 
738         /**
739          * Called once POLLOUT event is ready to be processed
740          *
741          * @param res   the result.
742          */
743         private void pollOut(int res) {
744             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
745                 return;
746             }
747             ioState &= ~POLL_OUT_SCHEDULED;
748             // pending connect
749             if (connectPromise != null) {
750                 // Note this method is invoked by the event loop only if the connection attempt was
751                 // neither cancelled nor timed out.
752 
753                 assert eventLoop().inEventLoop();
754 
755                 boolean connectStillInProgress = false;
756                 try {
757                     boolean wasActive = isActive();
758                     if (!socket.finishConnect()) {
759                         connectStillInProgress = true;
760                         return;
761                     }
762                     fulfillConnectPromise(connectPromise, wasActive);
763                 } catch (Throwable t) {
764                     fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
765                 } finally {
766                     if (!connectStillInProgress) {
767                         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0
768                         // is used
769                         // See https://github.com/netty/netty/issues/1770
770                         cancelConnectTimeoutFuture();
771                         connectPromise = null;
772                     } else {
773                         // This should only happen if the socket is non-blocking.
774                         assert !socket.isBlocking();
775 
776                         // The connect was not done yet, register for POLLOUT again
777                         schedulePollOut();
778                     }
779                 }
780             } else if (!socket.isOutputShutdown()) {
781                 // Try writing again
782                 super.flush0();
783             }
784         }
785 
786         /**
787          * Called once a write was completed.
788          *
789          * @param res   the result.
790          * @param flags the flags.
791          * @param data  the data that was passed when submitting the op.
792          */
793         private void writeComplete(int res, int flags, short data) {
794             if ((ioState & CONNECT_SCHEDULED) != 0) {
795                 // The writeComplete(...) callback was called because of a sendmsg(...) result that was used for
796                 // TCP_FASTOPEN_CONNECT.
797                 freeMsgHdrArray();
798                 if (res > 0) {
799                     // Connect complete!
800                     outboundBuffer().removeBytes(res);
801 
802                     // Explicit pass in 0 as this is returned by a connect(...) call when it was successful.
803                     connectComplete(0, flags, data);
804                 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
805                     // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
806                     // In this case, our TCP connection will be established normally, but no data was transmitted at
807                     // this time. We'll just transmit the data with normal writes later.
808                     // Let's submit a normal connect.
809                     submitConnect((InetSocketAddress) requestedRemoteAddress);
810                 } else {
811                     // There was an error, handle it as a normal connect error.
812                     connectComplete(res, flags, data);
813                 }
814                 return;
815             }
816             assert numOutstandingWrites > 0;
817             --numOutstandingWrites;
818 
819             boolean writtenAll = writeComplete0(res, flags, data, numOutstandingWrites);
820             if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
821                 // This should only happen if the socket is non-blocking.
822                 assert !socket.isBlocking();
823 
824                 // We were not able to write everything, let's register for POLLOUT
825                 schedulePollOut();
826             }
827 
828             // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
829             // while still removing messages internally in removeBytes(...) which then may corrupt state.
830             if (numOutstandingWrites == 0) {
831                 ioState &= ~WRITE_SCHEDULED;
832 
833                 // If we could write all and we did not schedule a pollout yet let us try to write again
834                 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
835                     doWrite(unsafe().outboundBuffer());
836                 }
837             }
838         }
839 
840         /**
841          * Called once a write was completed.
842          *
843          * @param res           the result.
844          * @param flags         the flags.
845          * @param data          the data that was passed when submitting the op.
846          * @param outstanding   the outstanding write completions.
847          */
848         abstract boolean writeComplete0(int res, int flags, int data, int outstanding);
849 
850         /**
851          * Called once a cancel was completed.
852          *
853          * @param res           the result.
854          * @param flags         the flags.
855          * @param data          the data that was passed when submitting the op.
856          */
857         void cancelComplete0(int res, int flags, short data) {
858             // NOOP
859         }
860 
861         /**
862          * Called once a connect was completed.
863          *
864          * @param res           the result.
865          * @param flags         the flags.
866          * @param data          the data that was passed when submitting the op.
867          */
868         void connectComplete(int res, int flags, short data) {
869             ioState &= ~CONNECT_SCHEDULED;
870             freeRemoteAddressMemory();
871 
872             if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
873                 // This should only happen if the socket is non-blocking.
874                 assert !socket.isBlocking();
875 
876                 // connect not complete yet need to wait for poll_out event
877                 schedulePollOut();
878             } else {
879                 try {
880                     if (res == 0) {
881                         fulfillConnectPromise(connectPromise, active);
882                         if (readPending) {
883                             doBeginReadNow();
884                         }
885                     } else if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
886                         try {
887                             Errors.throwConnectException("io_uring connect", res);
888                         } catch (Throwable cause) {
889                             fulfillConnectPromise(connectPromise, cause);
890                         }
891                     }
892                 } finally {
893                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is
894                     // used
895                     // See https://github.com/netty/netty/issues/1770
896                     cancelConnectTimeoutFuture();
897                     connectPromise = null;
898                 }
899             }
900         }
901 
902         @Override
903         public void connect(
904                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
905             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
906             // non-blocking io.
907             if (promise.isDone() || !ensureOpen(promise)) {
908                 return;
909             }
910 
911             if (delayedClose != null) {
912                 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
913                 return;
914             }
915             try {
916                 if (connectPromise != null) {
917                     throw new ConnectionPendingException();
918                 }
919                 if (localAddress instanceof InetSocketAddress) {
920                     checkResolvable((InetSocketAddress) localAddress);
921                 }
922 
923                 if (remoteAddress instanceof InetSocketAddress) {
924                     checkResolvable((InetSocketAddress) remoteAddress);
925                 }
926 
927                 if (remote != null) {
928                     // Check if already connected before trying to connect. This is needed as connect(...) will not#
929                     // return -1 and set errno to EISCONN if a previous connect(...) attempt was setting errno to
930                     // EINPROGRESS and finished later.
931                     throw new AlreadyConnectedException();
932                 }
933 
934                 if (localAddress != null) {
935                     socket.bind(localAddress);
936                 }
937 
938                 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
939 
940                 ByteBuf initialData = null;
941                 if (IoUring.isTcpFastOpenClientSideAvailable() &&
942                         config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
943                     ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
944                     outbound.addFlush();
945                     Object curr;
946                     if ((curr = outbound.current()) instanceof ByteBuf) {
947                         initialData = (ByteBuf) curr;
948                     }
949                 }
950                 if (initialData != null) {
951                     msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
952                     MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
953                     hdr.write(socket, inetSocketAddress, initialData.memoryAddress(),
954                             initialData.readableBytes(), (short) 0);
955 
956                     int fd = fd().intValue();
957                     IoUringIoRegistration registration = registration();
958                     IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, 0, Native.MSG_FASTOPEN,
959                             hdr.address(), hdr.idx());
960                     connectId = registration.submit(ops);
961                 } else {
962                     submitConnect(inetSocketAddress);
963                 }
964 
965                 ioState |= CONNECT_SCHEDULED;
966             } catch (Throwable t) {
967                 closeIfClosed();
968                 promise.tryFailure(annotateConnectException(t, remoteAddress));
969                 return;
970             }
971             connectPromise = promise;
972             requestedRemoteAddress = remoteAddress;
973             // Schedule connect timeout.
974             int connectTimeoutMillis = config().getConnectTimeoutMillis();
975             if (connectTimeoutMillis > 0) {
976                 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
977                     @Override
978                     public void run() {
979                         ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
980                         if (connectPromise != null && !connectPromise.isDone() &&
981                                 connectPromise.tryFailure(new ConnectTimeoutException(
982                                         "connection timed out: " + remoteAddress))) {
983                             close(voidPromise());
984                         }
985                     }
986                 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
987             }
988 
989             promise.addListener(new ChannelFutureListener() {
990                 @Override
991                 public void operationComplete(ChannelFuture future) {
992                     // If the connect future is cancelled we also cancel the timeout and close the
993                     // underlying socket.
994                     if (future.isCancelled()) {
995                         cancelConnectTimeoutFuture();
996                         connectPromise = null;
997                         close(voidPromise());
998                     }
999                 }
1000             });
1001         }
1002     }
1003 
1004     private void submitConnect(InetSocketAddress inetSocketAddress) {
1005         remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1006         long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
1007 
1008         SockaddrIn.write(socket.isIpv6(), remoteAddressMemoryAddress, inetSocketAddress);
1009 
1010         int fd = fd().intValue();
1011         IoUringIoRegistration registration = registration();
1012         IoUringIoOps ops = IoUringIoOps.newConnect(
1013                 fd, 0, remoteAddressMemoryAddress, nextOpsId());
1014         connectId = registration.submit(ops);
1015     }
1016 
1017     @Override
1018     protected Object filterOutboundMessage(Object msg) {
1019         if (msg instanceof ByteBuf) {
1020             ByteBuf buf = (ByteBuf) msg;
1021             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1022         }
1023 
1024         throw new UnsupportedOperationException("unsupported message type");
1025     }
1026 
1027     @Override
1028     protected void doRegister(ChannelPromise promise) {
1029         IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1030         eventLoop.register(ioUringUnsafe()).addListener(f -> {
1031             if (f.isSuccess()) {
1032                 registration = (IoUringIoRegistration) f.getNow();
1033                 promise.setSuccess();
1034             } else {
1035                 promise.setFailure(f.cause());
1036             }
1037         });
1038     }
1039 
1040     @Override
1041     protected final void doDeregister() {
1042         if (registration == null) {
1043             return;
1044         }
1045         int fd = fd().intValue();
1046         if ((ioState & POLL_RDHUP_SCHEDULED) != 0) {
1047             registration.submit(IoUringIoOps.newPollRemove(
1048                     fd, 0, pollRdhupId, (short) Native.POLLRDHUP));
1049         }
1050         if ((ioState & POLL_IN_SCHEDULED) != 0) {
1051             registration.submit(IoUringIoOps.newPollRemove(
1052                     fd, 0, pollInId, (short) Native.POLLIN));
1053         }
1054         if ((ioState & POLL_OUT_SCHEDULED) != 0) {
1055             registration.submit(IoUringIoOps.newPollRemove(
1056                     fd,  0, pollOutId, (short) Native.POLLOUT));
1057         }
1058 
1059         registration = null;
1060     }
1061 
1062     @Override
1063     protected void doBind(final SocketAddress local) throws Exception {
1064         if (local instanceof InetSocketAddress) {
1065             checkResolvable((InetSocketAddress) local);
1066         }
1067         socket.bind(local);
1068         this.local = socket.localAddress();
1069     }
1070 
1071     protected static void checkResolvable(InetSocketAddress addr) {
1072         if (addr.isUnresolved()) {
1073             throw new UnresolvedAddressException();
1074         }
1075     }
1076 
1077     @Override
1078     protected final SocketAddress localAddress0() {
1079         return local;
1080     }
1081 
1082     @Override
1083     protected final SocketAddress remoteAddress0() {
1084         return remote;
1085     }
1086 
1087     private static boolean isAllowHalfClosure(ChannelConfig config) {
1088         return config instanceof SocketChannelConfig &&
1089                ((SocketChannelConfig) config).isAllowHalfClosure();
1090     }
1091 
1092     private void cancelConnectTimeoutFuture() {
1093         if (connectTimeoutFuture != null) {
1094             connectTimeoutFuture.cancel(false);
1095             connectTimeoutFuture = null;
1096         }
1097     }
1098 
1099     private void computeRemote() {
1100         if (requestedRemoteAddress instanceof InetSocketAddress) {
1101             remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1102         }
1103     }
1104 
1105     private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1106         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1107     }
1108 
1109     private void clearPollFlag(int pollMask) {
1110         if ((pollMask & Native.POLLIN) != 0) {
1111             ioState &= ~POLL_IN_SCHEDULED;
1112         }
1113         if ((pollMask & Native.POLLOUT) != 0) {
1114             ioState &= ~POLL_OUT_SCHEDULED;
1115         }
1116         if ((pollMask & Native.POLLRDHUP) != 0) {
1117             ioState &= ~POLL_RDHUP_SCHEDULED;
1118         }
1119      }
1120 }