View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufHolder;
21  import io.netty.buffer.ByteBufUtil;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractChannel;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelOption;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.ServerChannel;
38  import io.netty.channel.socket.ChannelInputShutdownEvent;
39  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40  import io.netty.channel.socket.SocketChannelConfig;
41  import io.netty.channel.unix.Buffer;
42  import io.netty.channel.unix.DomainSocketAddress;
43  import io.netty.channel.unix.Errors;
44  import io.netty.channel.unix.FileDescriptor;
45  import io.netty.channel.unix.UnixChannel;
46  import io.netty.channel.unix.UnixChannelUtil;
47  import io.netty.util.ReferenceCountUtil;
48  import io.netty.util.concurrent.PromiseNotifier;
49  import io.netty.util.internal.CleanableDirectBuffer;
50  import io.netty.util.internal.logging.InternalLogger;
51  import io.netty.util.internal.logging.InternalLoggerFactory;
52  
53  import java.io.IOException;
54  import java.net.InetSocketAddress;
55  import java.net.SocketAddress;
56  import java.nio.ByteBuffer;
57  import java.nio.channels.AlreadyConnectedException;
58  import java.nio.channels.ClosedChannelException;
59  import java.nio.channels.ConnectionPendingException;
60  import java.nio.channels.NotYetConnectedException;
61  import java.nio.channels.UnresolvedAddressException;
62  import java.util.concurrent.ScheduledFuture;
63  import java.util.concurrent.TimeUnit;
64  
65  import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
66  import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
67  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
68  import static io.netty.util.internal.ObjectUtil.checkNotNull;
69  import static io.netty.util.internal.StringUtil.className;
70  
71  
72  abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
73      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
74      final LinuxSocket socket;
75      protected volatile boolean active;
76  
77      // Different masks for outstanding I/O operations.
78      private static final int POLL_IN_SCHEDULED = 1;
79      private static final int POLL_OUT_SCHEDULED = 1 << 2;
80      private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
81      private static final int WRITE_SCHEDULED = 1 << 4;
82      private static final int READ_SCHEDULED = 1 << 5;
83      private static final int CONNECT_SCHEDULED = 1 << 6;
84  
85      private short opsId = Short.MIN_VALUE;
86  
87      private long pollInId;
88      private long pollOutId;
89      private long pollRdhupId;
90      private long connectId;
91  
92      // A byte is enough for now.
93      private byte ioState;
94  
95      // It's possible that multiple read / writes are issued. We need to keep track of these.
96      // Let's limit the amount of pending writes and reads by Short.MAX_VALUE. Maybe Byte.MAX_VALUE would also be good
97      // enough but let's be a bit more flexible for now.
98      private short numOutstandingWrites;
99      // A value of -1 means that multi-shot is used and so reads will be issued as long as the request is not canceled.
100     private short numOutstandingReads;
101 
102     private boolean readPending;
103     private boolean inReadComplete;
104     private boolean socketHasMoreData;
105 
106     private static final class DelayedClose {
107         private final ChannelPromise promise;
108         private final Throwable cause;
109         private final ClosedChannelException closeCause;
110 
111         DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
112             this.promise = promise;
113             this.cause = cause;
114             this.closeCause = closeCause;
115         }
116     }
117     private DelayedClose delayedClose;
118     private boolean inputClosedSeenErrorOnRead;
119 
120     /**
121      * The future of the current connection attempt.  If not null, subsequent connection attempts will fail.
122      */
123     private ChannelPromise connectPromise;
124     private ScheduledFuture<?> connectTimeoutFuture;
125     private SocketAddress requestedRemoteAddress;
126     private CleanableDirectBuffer cleanable;
127     private ByteBuffer remoteAddressMemory;
128     private MsgHdrMemoryArray msgHdrMemoryArray;
129 
130     private IoRegistration registration;
131 
132     private volatile SocketAddress local;
133     private volatile SocketAddress remote;
134 
135     AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
136         super(parent);
137         this.socket = checkNotNull(socket, "fd");
138 
139         if (active) {
140             // Directly cache the remote and local addresses
141             // See https://github.com/netty/netty/issues/2359
142             this.active = true;
143             this.local = socket.localAddress();
144             this.remote = socket.remoteAddress();
145         }
146 
147         logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
148     }
149 
150     AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
151         super(parent);
152         this.socket = checkNotNull(fd, "fd");
153         this.active = true;
154 
155         // Directly cache the remote and local addresses
156         // See https://github.com/netty/netty/issues/2359
157         this.remote = remote;
158         this.local = fd.localAddress();
159     }
160 
161     // Called once a Channel changed from AUTO_READ=true to AUTO_READ=false
162     final void autoReadCleared() {
163         if (!isRegistered()) {
164             return;
165         }
166         IoRegistration registration = this.registration;
167         if (registration == null || !registration.isValid()) {
168             return;
169         }
170         if (eventLoop().inEventLoop()) {
171             clearRead();
172         } else {
173             eventLoop().execute(this::clearRead);
174         }
175     }
176 
177     private void clearRead() {
178         assert eventLoop().inEventLoop();
179         readPending = false;
180         IoRegistration registration = this.registration;
181         if (registration == null || !registration.isValid()) {
182             return;
183         }
184         // Also cancel all outstanding reads as the user did signal there is no more desire to read.
185         cancelOutstandingReads(registration(), numOutstandingReads);
186     }
187 
188     /**
189      * Returns the next id that should be used when submitting {@link IoUringIoOps}.
190      *
191      * @return  opsId
192      */
193     protected final short nextOpsId() {
194         short id = opsId++;
195 
196         // We use 0 for "none".
197         if (id == 0) {
198             id = opsId++;
199         }
200         return id;
201     }
202 
203     public final boolean isOpen() {
204         return socket.isOpen();
205     }
206 
207     @Override
208     public boolean isActive() {
209         return active;
210     }
211 
212     @Override
213     public final FileDescriptor fd() {
214         return socket;
215     }
216 
217     private AbstractUringUnsafe ioUringUnsafe() {
218         return (AbstractUringUnsafe) unsafe();
219     }
220 
221     @Override
222     protected boolean isCompatible(final EventLoop loop) {
223         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
224     }
225 
226     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
227         return newDirectBuffer(buf, buf);
228     }
229 
230     protected boolean allowMultiShotPollIn() {
231         return IoUring.isPollAddMultishotEnabled();
232     }
233 
234     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
235         final int readableBytes = buf.readableBytes();
236         if (readableBytes == 0) {
237             ReferenceCountUtil.release(holder);
238             return Unpooled.EMPTY_BUFFER;
239         }
240 
241         final ByteBufAllocator alloc = alloc();
242         if (alloc.isDirectBufferPooled()) {
243             return newDirectBuffer0(holder, buf, alloc, readableBytes);
244         }
245 
246         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
247         if (directBuf == null) {
248             return newDirectBuffer0(holder, buf, alloc, readableBytes);
249         }
250 
251         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
252         ReferenceCountUtil.safeRelease(holder);
253         return directBuf;
254     }
255 
256     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
257         final ByteBuf directBuf = alloc.directBuffer(capacity);
258         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
259         ReferenceCountUtil.safeRelease(holder);
260         return directBuf;
261     }
262 
263     /**
264      * Cancel all outstanding reads
265      *
266      * @param registration          the {@link IoRegistration}.
267      * @param numOutstandingReads   the number of outstanding reads, or {@code -1} if multi-shot was used.
268      */
269     protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
270 
271     /**
272      * Cancel all outstanding writes
273      *
274      * @param registration          the {@link IoRegistration}.
275      * @param numOutstandingWrites  the number of outstanding writes.
276      */
277     protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
278 
279     @Override
280     protected void doDisconnect() throws Exception {
281     }
282 
283     private void freeRemoteAddressMemory() {
284         if (remoteAddressMemory != null) {
285             cleanable.clean();
286             cleanable = null;
287             remoteAddressMemory = null;
288         }
289     }
290 
291     private void freeMsgHdrArray() {
292         if (msgHdrMemoryArray != null) {
293             msgHdrMemoryArray.release();
294             msgHdrMemoryArray = null;
295         }
296     }
297 
298     @Override
299     protected void doClose() throws Exception {
300         active = false;
301 
302         if (registration != null) {
303             if (socket.markClosed()) {
304                 int fd = fd().intValue();
305                 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
306                 registration.submit(ops);
307             }
308         } else {
309             // This one was never registered just use a syscall to close.
310             socket.close();
311             ioUringUnsafe().freeResourcesNowIfNeeded(null);
312         }
313     }
314 
315     @Override
316     protected final void doBeginRead() {
317         if (inputClosedSeenErrorOnRead) {
318             // We did see an error while reading and so closed the input. Stop reading.
319             return;
320         }
321         if (readPending) {
322             // We already have a read pending.
323             return;
324         }
325         readPending = true;
326         if (inReadComplete || !isActive()) {
327             // We are currently in the readComplete(...) callback which might issue more reads by itself.
328             // If readComplete(...) will not issue more reads itself it will pick up the readPending flag, reset it and
329             // call doBeginReadNow().
330             return;
331         }
332         doBeginReadNow();
333     }
334 
335     private void doBeginReadNow() {
336         if (inputClosedSeenErrorOnRead) {
337             // We did see an error while reading and so closed the input.
338             return;
339         }
340         if (!isPollInFirst() ||
341                 // If the socket was not empty, and we stopped reading we need to ensure we just force the
342                 // read as POLLIN might be edge-triggered (in case of POLL_ADD_MULTI).
343                 socketHasMoreData) {
344             // If the socket is blocking we will directly call scheduleFirstReadIfNeeded() as we can use FASTPOLL.
345             ioUringUnsafe().scheduleFirstReadIfNeeded();
346         } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
347             ioUringUnsafe().schedulePollIn();
348         }
349     }
350 
351     @Override
352     protected void doWrite(ChannelOutboundBuffer in) {
353         scheduleWriteIfNeeded(in, true);
354     }
355 
356     protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
357         if ((ioState & WRITE_SCHEDULED) != 0) {
358             return;
359         }
360         if (scheduleWrite(in) > 0) {
361             ioState |= WRITE_SCHEDULED;
362             if (submitAndRunNow && !isWritable()) {
363                 submitAndRunNow();
364             }
365         }
366     }
367 
368     protected void submitAndRunNow() {
369         // NOOP
370     }
371 
372     private int scheduleWrite(ChannelOutboundBuffer in) {
373         if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
374             return 0;
375         }
376         if (in == null) {
377             return 0;
378         }
379 
380         int msgCount = in.size();
381         if (msgCount == 0) {
382             return 0;
383         }
384         Object msg = in.current();
385 
386         if (msgCount > 1 && in.current() instanceof ByteBuf) {
387             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
388         } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
389                     (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
390             // We also need some special handling for CompositeByteBuf
391             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
392         } else {
393             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
394         }
395         // Ensure we never overflow
396         assert numOutstandingWrites > 0;
397         return numOutstandingWrites;
398     }
399 
400     protected final IoRegistration registration() {
401         assert registration != null;
402         return registration;
403     }
404 
405     private void schedulePollOut() {
406         pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
407     }
408 
409     final void schedulePollRdHup() {
410         pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
411     }
412 
413     private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
414         assert (ioState & ioMask) == 0;
415         int fd = fd().intValue();
416         IoRegistration registration = registration();
417         IoUringIoOps ops = IoUringIoOps.newPollAdd(
418                 fd, (byte) 0, mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
419         long id = registration.submit(ops);
420         if (id != 0) {
421             ioState |= (byte) ioMask;
422         }
423         return id;
424     }
425 
426     final void resetCachedAddresses() {
427         local = socket.localAddress();
428         remote = socket.remoteAddress();
429     }
430 
431     protected abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
432         private IoUringRecvByteAllocatorHandle allocHandle;
433         private boolean closed;
434         private boolean freed;
435         private boolean socketIsEmpty;
436 
437         /**
438          * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer} and returns the number of
439          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
440          */
441         protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
442 
443         /**
444          * Schedule the write of a single message and returns the number of
445          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
446          */
447         protected abstract int scheduleWriteSingle(Object msg);
448 
449         @Override
450         public final void handle(IoRegistration registration, IoEvent ioEvent) {
451             IoUringIoEvent event = (IoUringIoEvent) ioEvent;
452             byte op = event.opcode();
453             int res = event.res();
454             int flags = event.flags();
455             short data = event.data();
456             switch (op) {
457                 case Native.IORING_OP_RECV:
458                 case Native.IORING_OP_ACCEPT:
459                 case Native.IORING_OP_RECVMSG:
460                 case Native.IORING_OP_READ:
461                     readComplete(op, res, flags, data);
462                     break;
463                 case Native.IORING_OP_WRITEV:
464                 case Native.IORING_OP_SEND:
465                 case Native.IORING_OP_SENDMSG:
466                 case Native.IORING_OP_WRITE:
467                 case Native.IORING_OP_SPLICE:
468                 case Native.IORING_OP_SEND_ZC:
469                 case Native.IORING_OP_SENDMSG_ZC:
470                     writeComplete(op, res, flags, data);
471                     break;
472                 case Native.IORING_OP_POLL_ADD:
473                     pollAddComplete(res, flags, data);
474                     break;
475                 case Native.IORING_OP_ASYNC_CANCEL:
476                     cancelComplete0(op, res, flags, data);
477                     break;
478                 case Native.IORING_OP_CONNECT:
479                     connectComplete(op, res, flags, data);
480 
481                     // once the connect was completed we can also free some resources that are not needed anymore.
482                     freeMsgHdrArray();
483                     freeRemoteAddressMemory();
484                     break;
485                 case Native.IORING_OP_CLOSE:
486                     if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
487                         if (delayedClose != null) {
488                             delayedClose.promise.setSuccess();
489                         }
490                         closed = true;
491                     }
492                     break;
493                 default:
494                     break;
495             }
496 
497             // We delay the actual close if there is still a write or read scheduled, let's see if there
498             // was a close that needs to be done now.
499             handleDelayedClosed();
500 
501             if (ioState == 0 && closed) {
502                 freeResourcesNowIfNeeded(registration);
503             }
504         }
505 
506         private void freeResourcesNowIfNeeded(IoRegistration reg) {
507             if (!freed) {
508                 freed = true;
509                 freeResourcesNow(reg);
510             }
511         }
512 
513         /**
514          * Free all resources now. No new IO will be submitted for this channel via io_uring
515          *
516          * @param reg   the {@link IoRegistration} or {@code null} if it was never registered
517          */
518         protected void freeResourcesNow(IoRegistration reg) {
519             freeMsgHdrArray();
520             freeRemoteAddressMemory();
521             if (reg != null) {
522                 reg.cancel();
523             }
524         }
525 
526         private void handleDelayedClosed() {
527             if (delayedClose != null && canCloseNow()) {
528                 closeNow();
529             }
530         }
531 
532         private void pollAddComplete(int res, int flags, short data) {
533             if ((res & Native.POLLOUT) != 0) {
534                 pollOut(res);
535             }
536             if ((res & Native.POLLIN) != 0) {
537                 pollIn(res, flags, data);
538             }
539             if ((res & Native.POLLRDHUP) != 0) {
540                 pollRdHup(res);
541             }
542         }
543 
544         @Override
545         public final void close() throws Exception {
546             close(voidPromise());
547         }
548 
549         @Override
550         protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
551             if (closeFuture().isDone()) {
552                 // Closed already before.
553                 safeSetSuccess(promise);
554                 return;
555             }
556             if (delayedClose == null) {
557                 // We have a write operation pending that should be completed asap.
558                 // We will do the actual close operation one this write result is returned as otherwise
559                 // we may get into trouble as we may close the fd while we did not process the write yet.
560                 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
561             } else {
562                 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
563                 return;
564             }
565 
566             boolean cancelConnect = false;
567             try {
568                 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
569                 if (connectPromise != null) {
570                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
571                     connectPromise.tryFailure(new ClosedChannelException());
572                     AbstractIoUringChannel.this.connectPromise = null;
573                     cancelConnect = true;
574                 }
575 
576                 cancelConnectTimeoutFuture();
577             } finally {
578                 // It's important we cancel all outstanding connect, write and read operations now so
579                 // we will be able to process a delayed close if needed.
580                 cancelOps(cancelConnect);
581             }
582 
583             if (canCloseNow()) {
584                 // Currently there are is no WRITE and READ scheduled so we can start to teardown the channel.
585                 closeNow();
586             }
587         }
588 
589         private void cancelOps(boolean cancelConnect) {
590             if (registration == null || !registration.isValid()) {
591                 return;
592             }
593             byte flags = (byte) 0;
594             if ((ioState & POLL_RDHUP_SCHEDULED) != 0 && pollRdhupId != 0) {
595                 long id = registration.submit(
596                         IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
597                 assert id != 0;
598                 pollRdhupId = 0;
599             }
600             if ((ioState & POLL_IN_SCHEDULED) != 0 && pollInId != 0) {
601                 long id = registration.submit(
602                         IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
603                 assert id != 0;
604                 pollInId = 0;
605             }
606             if ((ioState & POLL_OUT_SCHEDULED) != 0 && pollOutId != 0) {
607                 long id = registration.submit(
608                         IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
609                 assert id != 0;
610                 pollOutId = 0;
611             }
612             if (cancelConnect && connectId != 0) {
613                 // Best effort to cancel the already submitted connect request.
614                 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
615                 assert id != 0;
616                 connectId = 0;
617             }
618             cancelOutstandingReads(registration, numOutstandingReads);
619             cancelOutstandingWrites(registration, numOutstandingWrites);
620         }
621 
622         private boolean canCloseNow() {
623             // Currently there are is no WRITE and READ scheduled, we can close the channel now without
624             // problems related to re-ordering of completions.
625             return canCloseNow0() && (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
626         }
627 
628         protected boolean canCloseNow0() {
629             return true;
630         }
631 
632         private void closeNow() {
633             super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
634         }
635 
636         @Override
637         protected final void flush0() {
638             // Flush immediately only when there's no pending flush.
639             // If there's a pending flush operation, event loop will call forceFlush() later,
640             // and thus there's no need to call it now.
641             if ((ioState & POLL_OUT_SCHEDULED) == 0) {
642                 super.flush0();
643             }
644         }
645 
646         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
647             if (promise == null) {
648                 // Closed via cancellation and the promise has been notified already.
649                 return;
650             }
651 
652             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
653             promise.tryFailure(cause);
654             closeIfClosed();
655         }
656 
657         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
658             if (promise == null) {
659                 // Closed via cancellation and the promise has been notified already.
660                 return;
661             }
662             active = true;
663 
664             if (local == null) {
665                 local = socket.localAddress();
666             }
667             computeRemote();
668 
669             // Register POLLRDHUP
670             schedulePollRdHup();
671 
672             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
673             // We still need to ensure we call fireChannelActive() in this case.
674             boolean active = isActive();
675 
676             // trySuccess() will return false if a user cancelled the connection attempt.
677             boolean promiseSet = promise.trySuccess();
678 
679             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
680             // because what happened is what happened.
681             if (!wasActive && active) {
682                 pipeline().fireChannelActive();
683             }
684 
685             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
686             if (!promiseSet) {
687                 close(voidPromise());
688             }
689         }
690 
691         @Override
692         public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
693             if (allocHandle == null) {
694                 allocHandle = new IoUringRecvByteAllocatorHandle(
695                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
696             }
697             return allocHandle;
698         }
699 
700         final void shutdownInput(boolean allDataRead) {
701             logger.trace("shutdownInput Fd: {}", fd().intValue());
702             if (!socket.isInputShutdown()) {
703                 if (isAllowHalfClosure(config())) {
704                     try {
705                         socket.shutdown(true, false);
706                     } catch (IOException ignored) {
707                         // We attempted to shutdown and failed, which means the input has already effectively been
708                         // shutdown.
709                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
710                         return;
711                     } catch (NotYetConnectedException ignore) {
712                         // We attempted to shutdown and failed, which means the input has already effectively been
713                         // shutdown.
714                     }
715                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
716                 } else {
717                     // Handle this same way as if we did read all data so we don't schedule another read.
718                     inputClosedSeenErrorOnRead = true;
719                     close(voidPromise());
720                     return;
721                 }
722             }
723             if (allDataRead && !inputClosedSeenErrorOnRead) {
724                 inputClosedSeenErrorOnRead = true;
725                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
726             }
727         }
728 
729         private void fireEventAndClose(Object evt) {
730             pipeline().fireUserEventTriggered(evt);
731             close(voidPromise());
732         }
733 
734         final void schedulePollIn() {
735             assert (ioState & POLL_IN_SCHEDULED) == 0;
736             if (!isActive() || shouldBreakIoUringInReady(config())) {
737                 return;
738             }
739             pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, allowMultiShotPollIn());
740         }
741 
742         private void readComplete(byte op, int res, int flags, short data) {
743             assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
744 
745             boolean multishot = numOutstandingReads == -1;
746             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
747             if (rearm) {
748                 // Reset READ_SCHEDULED if there is nothing more to handle and so we need to re-arm. This works for
749                 // multi-shot and non multi-shot variants.
750                 ioState &= ~READ_SCHEDULED;
751             }
752             boolean pending = readPending;
753             if (multishot) {
754                 // Reset readPending so we can still keep track if we might need to cancel the multi-shot read or
755                 // not.
756                 readPending = false;
757             } else if (--numOutstandingReads == 0) {
758                 // We received all outstanding completions.
759                 readPending = false;
760                 ioState &= ~READ_SCHEDULED;
761             }
762             inReadComplete = true;
763             try {
764                 socketIsEmpty = socketIsEmpty(flags);
765                 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
766                         (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
767                 readComplete0(op, res, flags, data, numOutstandingReads);
768             } finally {
769                 try {
770                     // Check if we should consider the read loop to be done.
771                     if (recvBufAllocHandle().isReadComplete()) {
772                         // Reset the handle as we are done with the read-loop.
773                         recvBufAllocHandle().reset(config());
774 
775                         // Check if this was a readComplete(...) triggered by a read or multi-shot read.
776                         if (!multishot) {
777                             if (readPending) {
778                                 // This was a "normal" read and the user did signal we should continue reading.
779                                 // Let's schedule the read now.
780                                 doBeginReadNow();
781                             }
782                         } else {
783                             // The readComplete(...) was triggered by a multi-shot read. Because of this the state
784                             // machine is a bit more complicated.
785 
786                             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
787                                 // The readComplete(...) was triggered because the previous read was cancelled.
788                                 // In this case we we need to check if the user did signal the desire to read again
789                                 // in the meantime. If this is the case we need to schedule the read to ensure
790                                 // we do not stall.
791                                 if (pending) {
792                                     doBeginReadNow();
793                                 }
794                             } else if (rearm) {
795                                 // We need to rearm the multishot as otherwise we might miss some data.
796                                 doBeginReadNow();
797                             } else if (!readPending) {
798                                 // Cancel the multi-shot read now as the user did not signal that we want to keep
799                                 // reading while we handle the completion event.
800                                 cancelOutstandingReads(registration, numOutstandingReads);
801                             }
802                         }
803                     } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
804                         // The readComplete(...) was triggered because the previous read was cancelled.
805                         // In this case we we need to check if the user did signal the desire to read again
806                         // in the meantime. If this is the case we need to schedule the read to ensure
807                         // we do not stall.
808                         if (pending) {
809                             doBeginReadNow();
810                         }
811                     } else if (multishot && rearm) {
812                         // We need to rearm the multishot as otherwise we might miss some data.
813                         doBeginReadNow();
814                     }
815                 } finally {
816                     inReadComplete = false;
817                     socketIsEmpty = false;
818                 }
819             }
820         }
821 
822         /**
823          * Called once a read was completed.
824          */
825         protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
826 
827         /**
828          * Called once POLLRDHUP event is ready to be processed
829          */
830         private void pollRdHup(int res) {
831             ioState &= ~POLL_RDHUP_SCHEDULED;
832             pollRdhupId = 0;
833             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
834                 return;
835             }
836 
837             // Mark that we received a POLLRDHUP and so need to continue reading until all the input ist drained.
838             recvBufAllocHandle().rdHupReceived();
839 
840             if (isActive()) {
841                 scheduleFirstReadIfNeeded();
842             } else {
843                 // Just to be safe make sure the input marked as closed.
844                 shutdownInput(false);
845             }
846         }
847 
848         /**
849          * Called once POLLIN event is ready to be processed
850          */
851         private void pollIn(int res, int flags, short data) {
852             // Check if we need to rearm. This works for both cases, POLL_ADD and POLL_ADD_MULTI.
853             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
854             if (rearm) {
855                 ioState &= ~POLL_IN_SCHEDULED;
856                 pollInId = 0;
857             }
858             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
859                 return;
860             }
861             if (!readPending) {
862                 // We received the POLLIN but the user is not interested yet in reading, just mark socketHasMoreData
863                 // as true so we will trigger a read directly once the user calls read()
864                 socketHasMoreData = true;
865                 return;
866             }
867             scheduleFirstReadIfNeeded();
868         }
869 
870         private void scheduleFirstReadIfNeeded() {
871             if ((ioState & READ_SCHEDULED) == 0) {
872                 scheduleFirstRead();
873             }
874         }
875 
876         private void scheduleFirstRead() {
877             // This is a new "read loop" so we need to reset the allocHandle.
878             final ChannelConfig config = config();
879             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
880             allocHandle.reset(config);
881             scheduleRead(true);
882         }
883 
884         protected final void scheduleRead(boolean first) {
885             // Only schedule another read if the fd is still open.
886             if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
887                 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
888                 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
889                     ioState |= READ_SCHEDULED;
890                 }
891             }
892         }
893 
894         /**
895          * Schedule a read and returns the number of {@link #readComplete(byte, int, int, short)}
896          * calls that are expected because of the scheduled read.
897          *
898          * @param first             {@code true} if this is the first read of a read loop.
899          * @param socketIsEmpty     {@code true} if the socket is guaranteed to be empty, {@code false} otherwise.
900          * @return                  the number of {@link #readComplete(byte, int, int, short)} calls expected or
901          *                          {@code -1} if {@link #readComplete(byte, int, int, short)} is called until
902          *                          the read is cancelled (multi-shot).
903          */
904         protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
905 
906         /**
907          * Called once POLLOUT event is ready to be processed
908          *
909          * @param res   the result.
910          */
911         private void pollOut(int res) {
912             ioState &= ~POLL_OUT_SCHEDULED;
913             pollOutId = 0;
914             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
915                 return;
916             }
917             // pending connect
918             if (connectPromise != null) {
919                 // Note this method is invoked by the event loop only if the connection attempt was
920                 // neither cancelled nor timed out.
921 
922                 assert eventLoop().inEventLoop();
923 
924                 boolean connectStillInProgress = false;
925                 try {
926                     boolean wasActive = isActive();
927                     if (!socket.finishConnect()) {
928                         connectStillInProgress = true;
929                         return;
930                     }
931                     fulfillConnectPromise(connectPromise, wasActive);
932                 } catch (Throwable t) {
933                     fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
934                 } finally {
935                     if (!connectStillInProgress) {
936                         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0
937                         // is used
938                         // See https://github.com/netty/netty/issues/1770
939                         cancelConnectTimeoutFuture();
940                         connectPromise = null;
941                     } else {
942                         // The connect was not done yet, register for POLLOUT again
943                         schedulePollOut();
944                     }
945                 }
946             } else if (!socket.isOutputShutdown()) {
947                 // Try writing again
948                 super.flush0();
949             }
950         }
951 
952         /**
953          * Called once a write was completed.
954          *
955          * @param op    the op code.
956          * @param res   the result.
957          * @param flags the flags.
958          * @param data  the data that was passed when submitting the op.
959          */
960         private void writeComplete(byte op, int res, int flags, short data) {
961             if ((ioState & CONNECT_SCHEDULED) != 0) {
962                 // The writeComplete(...) callback was called because of a sendmsg(...) result that was used for
963                 // TCP_FASTOPEN_CONNECT.
964                 freeMsgHdrArray();
965                 if (res > 0) {
966                     // Connect complete!
967                     outboundBuffer().removeBytes(res);
968 
969                     // Explicit pass in 0 as this is returned by a connect(...) call when it was successful.
970                     connectComplete(op, 0, flags, data);
971                 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
972                     // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
973                     // In this case, our TCP connection will be established normally, but no data was transmitted at
974                     // this time. We'll just transmit the data with normal writes later.
975                     // Let's submit a normal connect.
976                     submitConnect((InetSocketAddress) requestedRemoteAddress);
977                 } else {
978                     // There was an error, handle it as a normal connect error.
979                     connectComplete(op, res, flags, data);
980                 }
981                 return;
982             }
983 
984             if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
985                 assert numOutstandingWrites > 0;
986                 --numOutstandingWrites;
987             }
988 
989             boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
990             if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
991 
992                 // We were not able to write everything, let's register for POLLOUT
993                 schedulePollOut();
994             }
995 
996             // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
997             // while still removing messages internally in removeBytes(...) which then may corrupt state.
998             if (numOutstandingWrites == 0) {
999                 ioState &= ~WRITE_SCHEDULED;
1000 
1001                 // If we could write all and we did not schedule a pollout yet let us try to write again
1002                 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
1003                     scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
1004                 }
1005             }
1006         }
1007 
1008         /**
1009          * Called once a write was completed.
1010          * @param op            the op code
1011          * @param res           the result.
1012          * @param flags         the flags.
1013          * @param data          the data that was passed when submitting the op.
1014          * @param outstanding   the outstanding write completions.
1015          */
1016         abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1017 
1018         /**
1019          * Called once a cancel was completed.
1020          *
1021          * @param op            the op code
1022          * @param res           the result.
1023          * @param flags         the flags.
1024          * @param data          the data that was passed when submitting the op.
1025          */
1026         void cancelComplete0(byte op, int res, int flags, short data) {
1027             // NOOP
1028         }
1029 
1030         /**
1031          * Called once a connect was completed.
1032          * @param op            the op code.
1033          * @param res           the result.
1034          * @param flags         the flags.
1035          * @param data          the data that was passed when submitting the op.
1036          */
1037         void connectComplete(byte op, int res, int flags, short data) {
1038             ioState &= ~CONNECT_SCHEDULED;
1039             freeRemoteAddressMemory();
1040 
1041             if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1042                 // connect not complete yet need to wait for poll_out event
1043                 schedulePollOut();
1044             } else {
1045                 try {
1046                     if (res == 0) {
1047                         fulfillConnectPromise(connectPromise, active);
1048                         if (readPending) {
1049                             doBeginReadNow();
1050                         }
1051                     } else {
1052                         try {
1053                             Errors.throwConnectException("io_uring connect", res);
1054                         } catch (Throwable cause) {
1055                             fulfillConnectPromise(connectPromise, cause);
1056                         }
1057                     }
1058                 } finally {
1059                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is
1060                     // used
1061                     // See https://github.com/netty/netty/issues/1770
1062                     cancelConnectTimeoutFuture();
1063                     connectPromise = null;
1064                 }
1065             }
1066         }
1067 
1068         @Override
1069         public void connect(
1070                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1071             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
1072             // non-blocking io.
1073             if (promise.isDone() || !ensureOpen(promise)) {
1074                 return;
1075             }
1076 
1077             if (delayedClose != null) {
1078                 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1079                 return;
1080             }
1081             try {
1082                 if (connectPromise != null) {
1083                     throw new ConnectionPendingException();
1084                 }
1085                 if (localAddress instanceof InetSocketAddress) {
1086                     checkResolvable((InetSocketAddress) localAddress);
1087                 }
1088 
1089                 if (remoteAddress instanceof InetSocketAddress) {
1090                     checkResolvable((InetSocketAddress) remoteAddress);
1091                 }
1092 
1093                 if (remote != null) {
1094                     // Check if already connected before trying to connect. This is needed as connect(...) will not#
1095                     // return -1 and set errno to EISCONN if a previous connect(...) attempt was setting errno to
1096                     // EINPROGRESS and finished later.
1097                     throw new AlreadyConnectedException();
1098                 }
1099 
1100                 if (localAddress != null) {
1101                     socket.bind(localAddress);
1102                 }
1103 
1104                 if (remoteAddress instanceof InetSocketAddress) {
1105                     InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1106                     ByteBuf initialData = null;
1107                     if (IoUring.isTcpFastOpenClientSideAvailable() &&
1108                         config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1109                         ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1110                         outbound.addFlush();
1111                         Object curr;
1112                         if ((curr = outbound.current()) instanceof ByteBuf) {
1113                             initialData = (ByteBuf) curr;
1114                         }
1115                     }
1116                     if (initialData != null) {
1117                         msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1118                         MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1119                         hdr.set(socket, inetSocketAddress, IoUring.memoryAddress(initialData),
1120                                 initialData.readableBytes(), (short) 0);
1121 
1122                         int fd = fd().intValue();
1123                         IoRegistration registration = registration();
1124                         IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
1125                                 hdr.address(), hdr.idx());
1126                         connectId = registration.submit(ops);
1127                         if (connectId == 0) {
1128                             // Directly release the memory if submitting failed.
1129                             freeMsgHdrArray();
1130                         }
1131                     } else {
1132                         submitConnect(inetSocketAddress);
1133                     }
1134                 } else if (remoteAddress instanceof DomainSocketAddress) {
1135                     DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) remoteAddress;
1136                     submitConnect(unixDomainSocketAddress);
1137                 } else {
1138                     throw new Error("Unexpected SocketAddress implementation " + className(remoteAddress));
1139                 }
1140 
1141                 if (connectId != 0) {
1142                     ioState |= CONNECT_SCHEDULED;
1143                 }
1144             } catch (Throwable t) {
1145                 closeIfClosed();
1146                 promise.tryFailure(annotateConnectException(t, remoteAddress));
1147                 return;
1148             }
1149             connectPromise = promise;
1150             requestedRemoteAddress = remoteAddress;
1151             // Schedule connect timeout.
1152             int connectTimeoutMillis = config().getConnectTimeoutMillis();
1153             if (connectTimeoutMillis > 0) {
1154                 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1155                     @Override
1156                     public void run() {
1157                         ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1158                         if (connectPromise != null && !connectPromise.isDone() &&
1159                                 connectPromise.tryFailure(new ConnectTimeoutException(
1160                                         "connection timed out: " + remoteAddress))) {
1161                             close(voidPromise());
1162                         }
1163                     }
1164                 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1165             }
1166 
1167             promise.addListener(new ChannelFutureListener() {
1168                 @Override
1169                 public void operationComplete(ChannelFuture future) {
1170                     // If the connect future is cancelled we also cancel the timeout and close the
1171                     // underlying socket.
1172                     if (future.isCancelled()) {
1173                         cancelConnectTimeoutFuture();
1174                         connectPromise = null;
1175                         close(voidPromise());
1176                     }
1177                 }
1178             });
1179         }
1180     }
1181 
1182     private void submitConnect(InetSocketAddress inetSocketAddress) {
1183         cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1184         remoteAddressMemory = cleanable.buffer();
1185 
1186         SockaddrIn.set(socket.isIpv6(), remoteAddressMemory, inetSocketAddress);
1187 
1188         int fd = fd().intValue();
1189         IoRegistration registration = registration();
1190         IoUringIoOps ops = IoUringIoOps.newConnect(
1191                 fd, (byte) 0, Buffer.memoryAddress(remoteAddressMemory), nextOpsId());
1192         connectId = registration.submit(ops);
1193         if (connectId == 0) {
1194             // Directly release the memory if submitting failed.
1195             freeRemoteAddressMemory();
1196         }
1197     }
1198 
1199     private void submitConnect(DomainSocketAddress unixDomainSocketAddress) {
1200         cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_UN);
1201         remoteAddressMemory = cleanable.buffer();
1202         SockaddrIn.setUds(remoteAddressMemory, unixDomainSocketAddress);
1203         int fd = fd().intValue();
1204         IoRegistration registration = registration();
1205         long addr = Buffer.memoryAddress(remoteAddressMemory);
1206         IoUringIoOps ops = IoUringIoOps.newConnect(fd, (byte) 0, addr, Native.SIZEOF_SOCKADDR_UN, nextOpsId());
1207         connectId = registration.submit(ops);
1208         if (connectId == 0) {
1209             // Directly release the memory if submitting failed.
1210             freeRemoteAddressMemory();
1211         }
1212     }
1213 
1214     @Override
1215     protected Object filterOutboundMessage(Object msg) {
1216         if (msg instanceof ByteBuf) {
1217             ByteBuf buf = (ByteBuf) msg;
1218             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1219         }
1220         throw new UnsupportedOperationException("unsupported message type");
1221     }
1222 
1223     @Override
1224     protected void doRegister(ChannelPromise promise) {
1225         IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1226         eventLoop.register(ioUringUnsafe()).addListener(f -> {
1227             if (f.isSuccess()) {
1228                 registration = (IoRegistration) f.getNow();
1229                 promise.setSuccess();
1230             } else {
1231                 promise.setFailure(f.cause());
1232             }
1233         });
1234     }
1235 
1236     @Override
1237     protected final void doDeregister() {
1238         // Cancel all previous submitted ops.
1239         ioUringUnsafe().cancelOps(connectPromise != null);
1240     }
1241 
1242     @Override
1243     protected void doBind(final SocketAddress local) throws Exception {
1244         if (local instanceof InetSocketAddress) {
1245             checkResolvable((InetSocketAddress) local);
1246         }
1247         socket.bind(local);
1248         this.local = socket.localAddress();
1249     }
1250 
1251     protected static void checkResolvable(InetSocketAddress addr) {
1252         if (addr.isUnresolved()) {
1253             throw new UnresolvedAddressException();
1254         }
1255     }
1256 
1257     @Override
1258     protected final SocketAddress localAddress0() {
1259         return local;
1260     }
1261 
1262     @Override
1263     protected final SocketAddress remoteAddress0() {
1264         return remote;
1265     }
1266 
1267     private static boolean isAllowHalfClosure(ChannelConfig config) {
1268         return config instanceof SocketChannelConfig &&
1269                ((SocketChannelConfig) config).isAllowHalfClosure();
1270     }
1271 
1272     private void cancelConnectTimeoutFuture() {
1273         if (connectTimeoutFuture != null) {
1274             connectTimeoutFuture.cancel(false);
1275             connectTimeoutFuture = null;
1276         }
1277     }
1278 
1279     private void computeRemote() {
1280         if (requestedRemoteAddress instanceof InetSocketAddress) {
1281             remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1282         }
1283     }
1284 
1285     private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1286         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1287     }
1288 
1289     /**
1290      * Return if the socket is guaranteed to be empty when the submitted io was executed and the completion event be
1291      * created.
1292      * @param flags     the flags that were part of the completion
1293      * @return          {@code true} if empty.
1294      */
1295     protected abstract boolean socketIsEmpty(int flags);
1296 
1297     abstract boolean isPollInFirst();
1298 }