View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufHolder;
21  import io.netty.buffer.ByteBufUtil;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractChannel;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelOption;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.ServerChannel;
38  import io.netty.channel.socket.ChannelInputShutdownEvent;
39  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40  import io.netty.channel.socket.SocketChannelConfig;
41  import io.netty.channel.unix.Buffer;
42  import io.netty.channel.unix.DomainSocketAddress;
43  import io.netty.channel.unix.Errors;
44  import io.netty.channel.unix.FileDescriptor;
45  import io.netty.channel.unix.UnixChannel;
46  import io.netty.channel.unix.UnixChannelUtil;
47  import io.netty.util.ReferenceCountUtil;
48  import io.netty.util.concurrent.PromiseNotifier;
49  import io.netty.util.internal.CleanableDirectBuffer;
50  import io.netty.util.internal.logging.InternalLogger;
51  import io.netty.util.internal.logging.InternalLoggerFactory;
52  
53  import java.io.IOException;
54  import java.net.InetSocketAddress;
55  import java.net.SocketAddress;
56  import java.nio.ByteBuffer;
57  import java.nio.channels.AlreadyConnectedException;
58  import java.nio.channels.ClosedChannelException;
59  import java.nio.channels.ConnectionPendingException;
60  import java.nio.channels.NotYetConnectedException;
61  import java.nio.channels.UnresolvedAddressException;
62  import java.util.concurrent.ScheduledFuture;
63  import java.util.concurrent.TimeUnit;
64  
65  import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
66  import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
67  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
68  import static io.netty.util.internal.ObjectUtil.checkNotNull;
69  
70  
71  abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
72      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
73      final LinuxSocket socket;
74      protected volatile boolean active;
75  
76      // Different masks for outstanding I/O operations.
77      private static final int POLL_IN_SCHEDULED = 1;
78      private static final int POLL_OUT_SCHEDULED = 1 << 2;
79      private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
80      private static final int WRITE_SCHEDULED = 1 << 4;
81      private static final int READ_SCHEDULED = 1 << 5;
82      private static final int CONNECT_SCHEDULED = 1 << 6;
83  
84      private short opsId = Short.MIN_VALUE;
85  
86      private long pollInId;
87      private long pollOutId;
88      private long pollRdhupId;
89      private long connectId;
90  
91      // A byte is enough for now.
92      private byte ioState;
93  
94      // It's possible that multiple read / writes are issued. We need to keep track of these.
95      // Let's limit the amount of pending writes and reads by Short.MAX_VALUE. Maybe Byte.MAX_VALUE would also be good
96      // enough but let's be a bit more flexible for now.
97      private short numOutstandingWrites;
98      // A value of -1 means that multi-shot is used and so reads will be issued as long as the request is not canceled.
99      private short numOutstandingReads;
100 
101     private boolean readPending;
102     private boolean inReadComplete;
103     private boolean socketHasMoreData;
104 
105     private static final class DelayedClose {
106         private final ChannelPromise promise;
107         private final Throwable cause;
108         private final ClosedChannelException closeCause;
109 
110         DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
111             this.promise = promise;
112             this.cause = cause;
113             this.closeCause = closeCause;
114         }
115     }
116     private DelayedClose delayedClose;
117     private boolean inputClosedSeenErrorOnRead;
118 
119     /**
120      * The future of the current connection attempt.  If not null, subsequent connection attempts will fail.
121      */
122     private ChannelPromise connectPromise;
123     private ScheduledFuture<?> connectTimeoutFuture;
124     private SocketAddress requestedRemoteAddress;
125     private CleanableDirectBuffer cleanable;
126     private ByteBuffer remoteAddressMemory;
127     private MsgHdrMemoryArray msgHdrMemoryArray;
128 
129     private IoRegistration registration;
130 
131     private volatile SocketAddress local;
132     private volatile SocketAddress remote;
133 
134     AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
135         super(parent);
136         this.socket = checkNotNull(socket, "fd");
137 
138         if (active) {
139             // Directly cache the remote and local addresses
140             // See https://github.com/netty/netty/issues/2359
141             this.active = true;
142             this.local = socket.localAddress();
143             this.remote = socket.remoteAddress();
144         }
145 
146         logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
147     }
148 
149     AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
150         super(parent);
151         this.socket = checkNotNull(fd, "fd");
152         this.active = true;
153 
154         // Directly cache the remote and local addresses
155         // See https://github.com/netty/netty/issues/2359
156         this.remote = remote;
157         this.local = fd.localAddress();
158     }
159 
160     // Called once a Channel changed from AUTO_READ=true to AUTO_READ=false
161     final void autoReadCleared() {
162         if (!isRegistered()) {
163             return;
164         }
165         IoRegistration registration = this.registration;
166         if (registration == null || !registration.isValid()) {
167             return;
168         }
169         if (eventLoop().inEventLoop()) {
170             clearRead();
171         } else {
172             eventLoop().execute(this::clearRead);
173         }
174     }
175 
176     private void clearRead() {
177         assert eventLoop().inEventLoop();
178         readPending = false;
179         IoRegistration registration = this.registration;
180         if (registration == null || !registration.isValid()) {
181             return;
182         }
183         // Also cancel all outstanding reads as the user did signal there is no more desire to read.
184         cancelOutstandingReads(registration(), numOutstandingReads);
185     }
186 
187     /**
188      * Returns the next id that should be used when submitting {@link IoUringIoOps}.
189      *
190      * @return  opsId
191      */
192     protected final short nextOpsId() {
193         short id = opsId++;
194 
195         // We use 0 for "none".
196         if (id == 0) {
197             id = opsId++;
198         }
199         return id;
200     }
201 
202     public final boolean isOpen() {
203         return socket.isOpen();
204     }
205 
206     @Override
207     public boolean isActive() {
208         return active;
209     }
210 
211     @Override
212     public final FileDescriptor fd() {
213         return socket;
214     }
215 
216     private AbstractUringUnsafe ioUringUnsafe() {
217         return (AbstractUringUnsafe) unsafe();
218     }
219 
220     @Override
221     protected boolean isCompatible(final EventLoop loop) {
222         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
223     }
224 
225     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
226         return newDirectBuffer(buf, buf);
227     }
228 
229     protected boolean allowMultiShotPollIn() {
230         return IoUring.isPollAddMultishotEnabled();
231     }
232 
233     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
234         final int readableBytes = buf.readableBytes();
235         if (readableBytes == 0) {
236             ReferenceCountUtil.release(holder);
237             return Unpooled.EMPTY_BUFFER;
238         }
239 
240         final ByteBufAllocator alloc = alloc();
241         if (alloc.isDirectBufferPooled()) {
242             return newDirectBuffer0(holder, buf, alloc, readableBytes);
243         }
244 
245         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
246         if (directBuf == null) {
247             return newDirectBuffer0(holder, buf, alloc, readableBytes);
248         }
249 
250         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
251         ReferenceCountUtil.safeRelease(holder);
252         return directBuf;
253     }
254 
255     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
256         final ByteBuf directBuf = alloc.directBuffer(capacity);
257         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
258         ReferenceCountUtil.safeRelease(holder);
259         return directBuf;
260     }
261 
262     /**
263      * Cancel all outstanding reads
264      *
265      * @param registration          the {@link IoRegistration}.
266      * @param numOutstandingReads   the number of outstanding reads, or {@code -1} if multi-shot was used.
267      */
268     protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
269 
270     /**
271      * Cancel all outstanding writes
272      *
273      * @param registration          the {@link IoRegistration}.
274      * @param numOutstandingWrites  the number of outstanding writes.
275      */
276     protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
277 
278     @Override
279     protected void doDisconnect() throws Exception {
280     }
281 
282     private void freeRemoteAddressMemory() {
283         if (remoteAddressMemory != null) {
284             cleanable.clean();
285             cleanable = null;
286             remoteAddressMemory = null;
287         }
288     }
289 
290     private void freeMsgHdrArray() {
291         if (msgHdrMemoryArray != null) {
292             msgHdrMemoryArray.release();
293             msgHdrMemoryArray = null;
294         }
295     }
296 
297     @Override
298     protected void doClose() throws Exception {
299         active = false;
300 
301         if (registration != null) {
302             if (socket.markClosed()) {
303                 int fd = fd().intValue();
304                 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
305                 registration.submit(ops);
306             }
307         } else {
308             // This one was never registered just use a syscall to close.
309             socket.close();
310             ioUringUnsafe().freeResourcesNowIfNeeded(null);
311         }
312     }
313 
314     @Override
315     protected final void doBeginRead() {
316         if (inputClosedSeenErrorOnRead) {
317             // We did see an error while reading and so closed the input. Stop reading.
318             return;
319         }
320         if (readPending) {
321             // We already have a read pending.
322             return;
323         }
324         readPending = true;
325         if (inReadComplete || !isActive()) {
326             // We are currently in the readComplete(...) callback which might issue more reads by itself.
327             // If readComplete(...) will not issue more reads itself it will pick up the readPending flag, reset it and
328             // call doBeginReadNow().
329             return;
330         }
331         doBeginReadNow();
332     }
333 
334     private void doBeginReadNow() {
335         if (inputClosedSeenErrorOnRead) {
336             // We did see an error while reading and so closed the input.
337             return;
338         }
339         if (!isPollInFirst() ||
340                 // If the socket was not empty, and we stopped reading we need to ensure we just force the
341                 // read as POLLIN might be edge-triggered (in case of POLL_ADD_MULTI).
342                 socketHasMoreData) {
343             // If the socket is blocking we will directly call scheduleFirstReadIfNeeded() as we can use FASTPOLL.
344             ioUringUnsafe().scheduleFirstReadIfNeeded();
345         } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
346             ioUringUnsafe().schedulePollIn();
347         }
348     }
349 
350     @Override
351     protected void doWrite(ChannelOutboundBuffer in) {
352         scheduleWriteIfNeeded(in, true);
353     }
354 
355     protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
356         if ((ioState & WRITE_SCHEDULED) != 0) {
357             return;
358         }
359         if (scheduleWrite(in) > 0) {
360             ioState |= WRITE_SCHEDULED;
361             if (submitAndRunNow && !isWritable()) {
362                 submitAndRunNow();
363             }
364         }
365     }
366 
367     protected void submitAndRunNow() {
368         // NOOP
369     }
370 
371     private int scheduleWrite(ChannelOutboundBuffer in) {
372         if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
373             return 0;
374         }
375         if (in == null) {
376             return 0;
377         }
378 
379         int msgCount = in.size();
380         if (msgCount == 0) {
381             return 0;
382         }
383         Object msg = in.current();
384 
385         if (msgCount > 1 && in.current() instanceof ByteBuf) {
386             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
387         } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
388                     (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
389             // We also need some special handling for CompositeByteBuf
390             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
391         } else {
392             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
393         }
394         // Ensure we never overflow
395         assert numOutstandingWrites > 0;
396         return numOutstandingWrites;
397     }
398 
399     protected final IoRegistration registration() {
400         assert registration != null;
401         return registration;
402     }
403 
404     private void schedulePollOut() {
405         pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
406     }
407 
408     final void schedulePollRdHup() {
409         pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
410     }
411 
412     private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
413         assert (ioState & ioMask) == 0;
414         int fd = fd().intValue();
415         IoRegistration registration = registration();
416         IoUringIoOps ops = IoUringIoOps.newPollAdd(
417                 fd, (byte) 0, mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
418         long id = registration.submit(ops);
419         if (id != 0) {
420             ioState |= (byte) ioMask;
421         }
422         return id;
423     }
424 
425     final void resetCachedAddresses() {
426         local = socket.localAddress();
427         remote = socket.remoteAddress();
428     }
429 
430     protected abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
431         private IoUringRecvByteAllocatorHandle allocHandle;
432         private boolean closed;
433         private boolean freed;
434         private boolean socketIsEmpty;
435 
436         /**
437          * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer} and returns the number of
438          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
439          */
440         protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
441 
442         /**
443          * Schedule the write of a single message and returns the number of
444          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
445          */
446         protected abstract int scheduleWriteSingle(Object msg);
447 
448         @Override
449         public final void handle(IoRegistration registration, IoEvent ioEvent) {
450             IoUringIoEvent event = (IoUringIoEvent) ioEvent;
451             byte op = event.opcode();
452             int res = event.res();
453             int flags = event.flags();
454             short data = event.data();
455             switch (op) {
456                 case Native.IORING_OP_RECV:
457                 case Native.IORING_OP_ACCEPT:
458                 case Native.IORING_OP_RECVMSG:
459                 case Native.IORING_OP_READ:
460                     readComplete(op, res, flags, data);
461                     break;
462                 case Native.IORING_OP_WRITEV:
463                 case Native.IORING_OP_SEND:
464                 case Native.IORING_OP_SENDMSG:
465                 case Native.IORING_OP_WRITE:
466                 case Native.IORING_OP_SPLICE:
467                 case Native.IORING_OP_SEND_ZC:
468                 case Native.IORING_OP_SENDMSG_ZC:
469                     writeComplete(op, res, flags, data);
470                     break;
471                 case Native.IORING_OP_POLL_ADD:
472                     pollAddComplete(res, flags, data);
473                     break;
474                 case Native.IORING_OP_ASYNC_CANCEL:
475                     cancelComplete0(op, res, flags, data);
476                     break;
477                 case Native.IORING_OP_CONNECT:
478                     connectComplete(op, res, flags, data);
479 
480                     // once the connect was completed we can also free some resources that are not needed anymore.
481                     freeMsgHdrArray();
482                     freeRemoteAddressMemory();
483                     break;
484                 case Native.IORING_OP_CLOSE:
485                     if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
486                         if (delayedClose != null) {
487                             delayedClose.promise.setSuccess();
488                         }
489                         closed = true;
490                     }
491                     break;
492                 default:
493                     break;
494             }
495 
496             // We delay the actual close if there is still a write or read scheduled, let's see if there
497             // was a close that needs to be done now.
498             handleDelayedClosed();
499 
500             if (ioState == 0 && closed) {
501                 freeResourcesNowIfNeeded(registration);
502             }
503         }
504 
505         private void freeResourcesNowIfNeeded(IoRegistration reg) {
506             if (!freed) {
507                 freed = true;
508                 freeResourcesNow(reg);
509             }
510         }
511 
512         /**
513          * Free all resources now. No new IO will be submitted for this channel via io_uring
514          *
515          * @param reg   the {@link IoRegistration} or {@code null} if it was never registered
516          */
517         protected void freeResourcesNow(IoRegistration reg) {
518             freeMsgHdrArray();
519             freeRemoteAddressMemory();
520             if (reg != null) {
521                 reg.cancel();
522             }
523         }
524 
525         private void handleDelayedClosed() {
526             if (delayedClose != null && canCloseNow()) {
527                 closeNow();
528             }
529         }
530 
531         private void pollAddComplete(int res, int flags, short data) {
532             if ((res & Native.POLLOUT) != 0) {
533                 pollOut(res);
534             }
535             if ((res & Native.POLLIN) != 0) {
536                 pollIn(res, flags, data);
537             }
538             if ((res & Native.POLLRDHUP) != 0) {
539                 pollRdHup(res);
540             }
541         }
542 
543         @Override
544         public final void close() throws Exception {
545             close(voidPromise());
546         }
547 
548         @Override
549         protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
550             if (closeFuture().isDone()) {
551                 // Closed already before.
552                 safeSetSuccess(promise);
553                 return;
554             }
555             if (delayedClose == null) {
556                 // We have a write operation pending that should be completed asap.
557                 // We will do the actual close operation one this write result is returned as otherwise
558                 // we may get into trouble as we may close the fd while we did not process the write yet.
559                 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
560             } else {
561                 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
562                 return;
563             }
564 
565             boolean cancelConnect = false;
566             try {
567                 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
568                 if (connectPromise != null) {
569                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
570                     connectPromise.tryFailure(new ClosedChannelException());
571                     AbstractIoUringChannel.this.connectPromise = null;
572                     cancelConnect = true;
573                 }
574 
575                 cancelConnectTimeoutFuture();
576             } finally {
577                 // It's important we cancel all outstanding connect, write and read operations now so
578                 // we will be able to process a delayed close if needed.
579                 cancelOps(cancelConnect);
580             }
581 
582             if (canCloseNow()) {
583                 // Currently there are is no WRITE and READ scheduled so we can start to teardown the channel.
584                 closeNow();
585             }
586         }
587 
588         private void cancelOps(boolean cancelConnect) {
589             if (registration == null || !registration.isValid()) {
590                 return;
591             }
592             byte flags = (byte) 0;
593             if ((ioState & POLL_RDHUP_SCHEDULED) != 0 && pollRdhupId != 0) {
594                 long id = registration.submit(
595                         IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
596                 assert id != 0;
597                 pollRdhupId = 0;
598             }
599             if ((ioState & POLL_IN_SCHEDULED) != 0 && pollInId != 0) {
600                 long id = registration.submit(
601                         IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
602                 assert id != 0;
603                 pollInId = 0;
604             }
605             if ((ioState & POLL_OUT_SCHEDULED) != 0 && pollOutId != 0) {
606                 long id = registration.submit(
607                         IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
608                 assert id != 0;
609                 pollOutId = 0;
610             }
611             if (cancelConnect && connectId != 0) {
612                 // Best effort to cancel the already submitted connect request.
613                 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
614                 assert id != 0;
615                 connectId = 0;
616             }
617             cancelOutstandingReads(registration, numOutstandingReads);
618             cancelOutstandingWrites(registration, numOutstandingWrites);
619         }
620 
621         private boolean canCloseNow() {
622             // Currently there are is no WRITE and READ scheduled, we can close the channel now without
623             // problems related to re-ordering of completions.
624             return canCloseNow0() && (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
625         }
626 
627         protected boolean canCloseNow0() {
628             return true;
629         }
630 
631         private void closeNow() {
632             super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
633         }
634 
635         @Override
636         protected final void flush0() {
637             // Flush immediately only when there's no pending flush.
638             // If there's a pending flush operation, event loop will call forceFlush() later,
639             // and thus there's no need to call it now.
640             if ((ioState & POLL_OUT_SCHEDULED) == 0) {
641                 super.flush0();
642             }
643         }
644 
645         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
646             if (promise == null) {
647                 // Closed via cancellation and the promise has been notified already.
648                 return;
649             }
650 
651             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
652             promise.tryFailure(cause);
653             closeIfClosed();
654         }
655 
656         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
657             if (promise == null) {
658                 // Closed via cancellation and the promise has been notified already.
659                 return;
660             }
661             active = true;
662 
663             if (local == null) {
664                 local = socket.localAddress();
665             }
666             computeRemote();
667 
668             // Register POLLRDHUP
669             schedulePollRdHup();
670 
671             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
672             // We still need to ensure we call fireChannelActive() in this case.
673             boolean active = isActive();
674 
675             // trySuccess() will return false if a user cancelled the connection attempt.
676             boolean promiseSet = promise.trySuccess();
677 
678             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
679             // because what happened is what happened.
680             if (!wasActive && active) {
681                 pipeline().fireChannelActive();
682             }
683 
684             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
685             if (!promiseSet) {
686                 close(voidPromise());
687             }
688         }
689 
690         @Override
691         public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
692             if (allocHandle == null) {
693                 allocHandle = new IoUringRecvByteAllocatorHandle(
694                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
695             }
696             return allocHandle;
697         }
698 
699         final void shutdownInput(boolean allDataRead) {
700             logger.trace("shutdownInput Fd: {}", fd().intValue());
701             if (!socket.isInputShutdown()) {
702                 if (isAllowHalfClosure(config())) {
703                     try {
704                         socket.shutdown(true, false);
705                     } catch (IOException ignored) {
706                         // We attempted to shutdown and failed, which means the input has already effectively been
707                         // shutdown.
708                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
709                         return;
710                     } catch (NotYetConnectedException ignore) {
711                         // We attempted to shutdown and failed, which means the input has already effectively been
712                         // shutdown.
713                     }
714                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
715                 } else {
716                     // Handle this same way as if we did read all data so we don't schedule another read.
717                     inputClosedSeenErrorOnRead = true;
718                     close(voidPromise());
719                     return;
720                 }
721             }
722             if (allDataRead && !inputClosedSeenErrorOnRead) {
723                 inputClosedSeenErrorOnRead = true;
724                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
725             }
726         }
727 
728         private void fireEventAndClose(Object evt) {
729             pipeline().fireUserEventTriggered(evt);
730             close(voidPromise());
731         }
732 
733         final void schedulePollIn() {
734             assert (ioState & POLL_IN_SCHEDULED) == 0;
735             if (!isActive() || shouldBreakIoUringInReady(config())) {
736                 return;
737             }
738             pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, allowMultiShotPollIn());
739         }
740 
741         private void readComplete(byte op, int res, int flags, short data) {
742             assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
743 
744             boolean multishot = numOutstandingReads == -1;
745             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
746             if (rearm) {
747                 // Reset READ_SCHEDULED if there is nothing more to handle and so we need to re-arm. This works for
748                 // multi-shot and non multi-shot variants.
749                 ioState &= ~READ_SCHEDULED;
750             }
751             boolean pending = readPending;
752             if (multishot) {
753                 // Reset readPending so we can still keep track if we might need to cancel the multi-shot read or
754                 // not.
755                 readPending = false;
756             } else if (--numOutstandingReads == 0) {
757                 // We received all outstanding completions.
758                 readPending = false;
759                 ioState &= ~READ_SCHEDULED;
760             }
761             inReadComplete = true;
762             try {
763                 socketIsEmpty = socketIsEmpty(flags);
764                 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
765                         (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
766                 readComplete0(op, res, flags, data, numOutstandingReads);
767             } finally {
768                 try {
769                     // Check if we should consider the read loop to be done.
770                     if (recvBufAllocHandle().isReadComplete()) {
771                         // Reset the handle as we are done with the read-loop.
772                         recvBufAllocHandle().reset(config());
773 
774                         // Check if this was a readComplete(...) triggered by a read or multi-shot read.
775                         if (!multishot) {
776                             if (readPending) {
777                                 // This was a "normal" read and the user did signal we should continue reading.
778                                 // Let's schedule the read now.
779                                 doBeginReadNow();
780                             }
781                         } else {
782                             // The readComplete(...) was triggered by a multi-shot read. Because of this the state
783                             // machine is a bit more complicated.
784 
785                             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
786                                 // The readComplete(...) was triggered because the previous read was cancelled.
787                                 // In this case we we need to check if the user did signal the desire to read again
788                                 // in the meantime. If this is the case we need to schedule the read to ensure
789                                 // we do not stall.
790                                 if (pending) {
791                                     doBeginReadNow();
792                                 }
793                             } else if (rearm) {
794                                 // We need to rearm the multishot as otherwise we might miss some data.
795                                 doBeginReadNow();
796                             } else if (!readPending) {
797                                 // Cancel the multi-shot read now as the user did not signal that we want to keep
798                                 // reading while we handle the completion event.
799                                 cancelOutstandingReads(registration, numOutstandingReads);
800                             }
801                         }
802                     } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
803                         // The readComplete(...) was triggered because the previous read was cancelled.
804                         // In this case we we need to check if the user did signal the desire to read again
805                         // in the meantime. If this is the case we need to schedule the read to ensure
806                         // we do not stall.
807                         if (pending) {
808                             doBeginReadNow();
809                         }
810                     } else if (multishot && rearm) {
811                         // We need to rearm the multishot as otherwise we might miss some data.
812                         doBeginReadNow();
813                     }
814                 } finally {
815                     inReadComplete = false;
816                     socketIsEmpty = false;
817                 }
818             }
819         }
820 
821         /**
822          * Called once a read was completed.
823          */
824         protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
825 
826         /**
827          * Called once POLLRDHUP event is ready to be processed
828          */
829         private void pollRdHup(int res) {
830             ioState &= ~POLL_RDHUP_SCHEDULED;
831             pollRdhupId = 0;
832             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
833                 return;
834             }
835 
836             // Mark that we received a POLLRDHUP and so need to continue reading until all the input ist drained.
837             recvBufAllocHandle().rdHupReceived();
838 
839             if (isActive()) {
840                 scheduleFirstReadIfNeeded();
841             } else {
842                 // Just to be safe make sure the input marked as closed.
843                 shutdownInput(false);
844             }
845         }
846 
847         /**
848          * Called once POLLIN event is ready to be processed
849          */
850         private void pollIn(int res, int flags, short data) {
851             // Check if we need to rearm. This works for both cases, POLL_ADD and POLL_ADD_MULTI.
852             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
853             if (rearm) {
854                 ioState &= ~POLL_IN_SCHEDULED;
855                 pollInId = 0;
856             }
857             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
858                 return;
859             }
860             if (!readPending) {
861                 // We received the POLLIN but the user is not interested yet in reading, just mark socketHasMoreData
862                 // as true so we will trigger a read directly once the user calls read()
863                 socketHasMoreData = true;
864                 return;
865             }
866             scheduleFirstReadIfNeeded();
867         }
868 
869         private void scheduleFirstReadIfNeeded() {
870             if ((ioState & READ_SCHEDULED) == 0) {
871                 scheduleFirstRead();
872             }
873         }
874 
875         private void scheduleFirstRead() {
876             // This is a new "read loop" so we need to reset the allocHandle.
877             final ChannelConfig config = config();
878             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
879             allocHandle.reset(config);
880             scheduleRead(true);
881         }
882 
883         protected final void scheduleRead(boolean first) {
884             // Only schedule another read if the fd is still open.
885             if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
886                 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
887                 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
888                     ioState |= READ_SCHEDULED;
889                 }
890             }
891         }
892 
893         /**
894          * Schedule a read and returns the number of {@link #readComplete(byte, int, int, short)}
895          * calls that are expected because of the scheduled read.
896          *
897          * @param first             {@code true} if this is the first read of a read loop.
898          * @param socketIsEmpty     {@code true} if the socket is guaranteed to be empty, {@code false} otherwise.
899          * @return                  the number of {@link #readComplete(byte, int, int, short)} calls expected or
900          *                          {@code -1} if {@link #readComplete(byte, int, int, short)} is called until
901          *                          the read is cancelled (multi-shot).
902          */
903         protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
904 
905         /**
906          * Called once POLLOUT event is ready to be processed
907          *
908          * @param res   the result.
909          */
910         private void pollOut(int res) {
911             ioState &= ~POLL_OUT_SCHEDULED;
912             pollOutId = 0;
913             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
914                 return;
915             }
916             // pending connect
917             if (connectPromise != null) {
918                 // Note this method is invoked by the event loop only if the connection attempt was
919                 // neither cancelled nor timed out.
920 
921                 assert eventLoop().inEventLoop();
922 
923                 boolean connectStillInProgress = false;
924                 try {
925                     boolean wasActive = isActive();
926                     if (!socket.finishConnect()) {
927                         connectStillInProgress = true;
928                         return;
929                     }
930                     fulfillConnectPromise(connectPromise, wasActive);
931                 } catch (Throwable t) {
932                     fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
933                 } finally {
934                     if (!connectStillInProgress) {
935                         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0
936                         // is used
937                         // See https://github.com/netty/netty/issues/1770
938                         cancelConnectTimeoutFuture();
939                         connectPromise = null;
940                     } else {
941                         // The connect was not done yet, register for POLLOUT again
942                         schedulePollOut();
943                     }
944                 }
945             } else if (!socket.isOutputShutdown()) {
946                 // Try writing again
947                 super.flush0();
948             }
949         }
950 
951         /**
952          * Called once a write was completed.
953          *
954          * @param op    the op code.
955          * @param res   the result.
956          * @param flags the flags.
957          * @param data  the data that was passed when submitting the op.
958          */
959         private void writeComplete(byte op, int res, int flags, short data) {
960             if ((ioState & CONNECT_SCHEDULED) != 0) {
961                 // The writeComplete(...) callback was called because of a sendmsg(...) result that was used for
962                 // TCP_FASTOPEN_CONNECT.
963                 freeMsgHdrArray();
964                 if (res > 0) {
965                     // Connect complete!
966                     outboundBuffer().removeBytes(res);
967 
968                     // Explicit pass in 0 as this is returned by a connect(...) call when it was successful.
969                     connectComplete(op, 0, flags, data);
970                 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
971                     // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
972                     // In this case, our TCP connection will be established normally, but no data was transmitted at
973                     // this time. We'll just transmit the data with normal writes later.
974                     // Let's submit a normal connect.
975                     submitConnect((InetSocketAddress) requestedRemoteAddress);
976                 } else {
977                     // There was an error, handle it as a normal connect error.
978                     connectComplete(op, res, flags, data);
979                 }
980                 return;
981             }
982 
983             if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
984                 assert numOutstandingWrites > 0;
985                 --numOutstandingWrites;
986             }
987 
988             boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
989             if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
990 
991                 // We were not able to write everything, let's register for POLLOUT
992                 schedulePollOut();
993             }
994 
995             // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
996             // while still removing messages internally in removeBytes(...) which then may corrupt state.
997             if (numOutstandingWrites == 0) {
998                 ioState &= ~WRITE_SCHEDULED;
999 
1000                 // If we could write all and we did not schedule a pollout yet let us try to write again
1001                 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
1002                     scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
1003                 }
1004             }
1005         }
1006 
1007         /**
1008          * Called once a write was completed.
1009          * @param op            the op code
1010          * @param res           the result.
1011          * @param flags         the flags.
1012          * @param data          the data that was passed when submitting the op.
1013          * @param outstanding   the outstanding write completions.
1014          */
1015         abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1016 
1017         /**
1018          * Called once a cancel was completed.
1019          *
1020          * @param op            the op code
1021          * @param res           the result.
1022          * @param flags         the flags.
1023          * @param data          the data that was passed when submitting the op.
1024          */
1025         void cancelComplete0(byte op, int res, int flags, short data) {
1026             // NOOP
1027         }
1028 
1029         /**
1030          * Called once a connect was completed.
1031          * @param op            the op code.
1032          * @param res           the result.
1033          * @param flags         the flags.
1034          * @param data          the data that was passed when submitting the op.
1035          */
1036         void connectComplete(byte op, int res, int flags, short data) {
1037             ioState &= ~CONNECT_SCHEDULED;
1038             freeRemoteAddressMemory();
1039 
1040             if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1041                 // connect not complete yet need to wait for poll_out event
1042                 schedulePollOut();
1043             } else {
1044                 try {
1045                     if (res == 0) {
1046                         fulfillConnectPromise(connectPromise, active);
1047                         if (readPending) {
1048                             doBeginReadNow();
1049                         }
1050                     } else {
1051                         try {
1052                             Errors.throwConnectException("io_uring connect", res);
1053                         } catch (Throwable cause) {
1054                             fulfillConnectPromise(connectPromise, cause);
1055                         }
1056                     }
1057                 } finally {
1058                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is
1059                     // used
1060                     // See https://github.com/netty/netty/issues/1770
1061                     cancelConnectTimeoutFuture();
1062                     connectPromise = null;
1063                 }
1064             }
1065         }
1066 
1067         @Override
1068         public void connect(
1069                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1070             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
1071             // non-blocking io.
1072             if (promise.isDone() || !ensureOpen(promise)) {
1073                 return;
1074             }
1075 
1076             if (delayedClose != null) {
1077                 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1078                 return;
1079             }
1080             try {
1081                 if (connectPromise != null) {
1082                     throw new ConnectionPendingException();
1083                 }
1084                 if (localAddress instanceof InetSocketAddress) {
1085                     checkResolvable((InetSocketAddress) localAddress);
1086                 }
1087 
1088                 if (remoteAddress instanceof InetSocketAddress) {
1089                     checkResolvable((InetSocketAddress) remoteAddress);
1090                 }
1091 
1092                 if (remote != null) {
1093                     // Check if already connected before trying to connect. This is needed as connect(...) will not#
1094                     // return -1 and set errno to EISCONN if a previous connect(...) attempt was setting errno to
1095                     // EINPROGRESS and finished later.
1096                     throw new AlreadyConnectedException();
1097                 }
1098 
1099                 if (localAddress != null) {
1100                     socket.bind(localAddress);
1101                 }
1102 
1103                 if (remoteAddress instanceof InetSocketAddress) {
1104                     InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1105                     ByteBuf initialData = null;
1106                     if (IoUring.isTcpFastOpenClientSideAvailable() &&
1107                         config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1108                         ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1109                         outbound.addFlush();
1110                         Object curr;
1111                         if ((curr = outbound.current()) instanceof ByteBuf) {
1112                             initialData = (ByteBuf) curr;
1113                         }
1114                     }
1115                     if (initialData != null) {
1116                         msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1117                         MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1118                         hdr.set(socket, inetSocketAddress, IoUring.memoryAddress(initialData),
1119                                 initialData.readableBytes(), (short) 0);
1120 
1121                         int fd = fd().intValue();
1122                         IoRegistration registration = registration();
1123                         IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
1124                                 hdr.address(), hdr.idx());
1125                         connectId = registration.submit(ops);
1126                         if (connectId == 0) {
1127                             // Directly release the memory if submitting failed.
1128                             freeMsgHdrArray();
1129                         }
1130                     } else {
1131                         submitConnect(inetSocketAddress);
1132                     }
1133                 } else if (remoteAddress instanceof DomainSocketAddress) {
1134                     DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) remoteAddress;
1135                     submitConnect(unixDomainSocketAddress);
1136                 } else {
1137                     throw new Error("Unexpected SocketAddress implementation " + remoteAddress);
1138                 }
1139 
1140                 if (connectId != 0) {
1141                     ioState |= CONNECT_SCHEDULED;
1142                 }
1143             } catch (Throwable t) {
1144                 closeIfClosed();
1145                 promise.tryFailure(annotateConnectException(t, remoteAddress));
1146                 return;
1147             }
1148             connectPromise = promise;
1149             requestedRemoteAddress = remoteAddress;
1150             // Schedule connect timeout.
1151             int connectTimeoutMillis = config().getConnectTimeoutMillis();
1152             if (connectTimeoutMillis > 0) {
1153                 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1154                     @Override
1155                     public void run() {
1156                         ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1157                         if (connectPromise != null && !connectPromise.isDone() &&
1158                                 connectPromise.tryFailure(new ConnectTimeoutException(
1159                                         "connection timed out: " + remoteAddress))) {
1160                             close(voidPromise());
1161                         }
1162                     }
1163                 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1164             }
1165 
1166             promise.addListener(new ChannelFutureListener() {
1167                 @Override
1168                 public void operationComplete(ChannelFuture future) {
1169                     // If the connect future is cancelled we also cancel the timeout and close the
1170                     // underlying socket.
1171                     if (future.isCancelled()) {
1172                         cancelConnectTimeoutFuture();
1173                         connectPromise = null;
1174                         close(voidPromise());
1175                     }
1176                 }
1177             });
1178         }
1179     }
1180 
1181     private void submitConnect(InetSocketAddress inetSocketAddress) {
1182         cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1183         remoteAddressMemory = cleanable.buffer();
1184 
1185         SockaddrIn.set(socket.isIpv6(), remoteAddressMemory, inetSocketAddress);
1186 
1187         int fd = fd().intValue();
1188         IoRegistration registration = registration();
1189         IoUringIoOps ops = IoUringIoOps.newConnect(
1190                 fd, (byte) 0, Buffer.memoryAddress(remoteAddressMemory), nextOpsId());
1191         connectId = registration.submit(ops);
1192         if (connectId == 0) {
1193             // Directly release the memory if submitting failed.
1194             freeRemoteAddressMemory();
1195         }
1196     }
1197 
1198     private void submitConnect(DomainSocketAddress unixDomainSocketAddress) {
1199         cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_UN);
1200         remoteAddressMemory = cleanable.buffer();
1201         SockaddrIn.setUds(remoteAddressMemory, unixDomainSocketAddress);
1202         int fd = fd().intValue();
1203         IoRegistration registration = registration();
1204         long addr = Buffer.memoryAddress(remoteAddressMemory);
1205         IoUringIoOps ops = IoUringIoOps.newConnect(fd, (byte) 0, addr, Native.SIZEOF_SOCKADDR_UN, nextOpsId());
1206         connectId = registration.submit(ops);
1207         if (connectId == 0) {
1208             // Directly release the memory if submitting failed.
1209             freeRemoteAddressMemory();
1210         }
1211     }
1212 
1213     @Override
1214     protected Object filterOutboundMessage(Object msg) {
1215         if (msg instanceof ByteBuf) {
1216             ByteBuf buf = (ByteBuf) msg;
1217             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1218         }
1219         throw new UnsupportedOperationException("unsupported message type");
1220     }
1221 
1222     @Override
1223     protected void doRegister(ChannelPromise promise) {
1224         IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1225         eventLoop.register(ioUringUnsafe()).addListener(f -> {
1226             if (f.isSuccess()) {
1227                 registration = (IoRegistration) f.getNow();
1228                 promise.setSuccess();
1229             } else {
1230                 promise.setFailure(f.cause());
1231             }
1232         });
1233     }
1234 
1235     @Override
1236     protected final void doDeregister() {
1237         // Cancel all previous submitted ops.
1238         ioUringUnsafe().cancelOps(connectPromise != null);
1239     }
1240 
1241     @Override
1242     protected void doBind(final SocketAddress local) throws Exception {
1243         if (local instanceof InetSocketAddress) {
1244             checkResolvable((InetSocketAddress) local);
1245         }
1246         socket.bind(local);
1247         this.local = socket.localAddress();
1248     }
1249 
1250     protected static void checkResolvable(InetSocketAddress addr) {
1251         if (addr.isUnresolved()) {
1252             throw new UnresolvedAddressException();
1253         }
1254     }
1255 
1256     @Override
1257     protected final SocketAddress localAddress0() {
1258         return local;
1259     }
1260 
1261     @Override
1262     protected final SocketAddress remoteAddress0() {
1263         return remote;
1264     }
1265 
1266     private static boolean isAllowHalfClosure(ChannelConfig config) {
1267         return config instanceof SocketChannelConfig &&
1268                ((SocketChannelConfig) config).isAllowHalfClosure();
1269     }
1270 
1271     private void cancelConnectTimeoutFuture() {
1272         if (connectTimeoutFuture != null) {
1273             connectTimeoutFuture.cancel(false);
1274             connectTimeoutFuture = null;
1275         }
1276     }
1277 
1278     private void computeRemote() {
1279         if (requestedRemoteAddress instanceof InetSocketAddress) {
1280             remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1281         }
1282     }
1283 
1284     private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1285         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1286     }
1287 
1288     /**
1289      * Return if the socket is guaranteed to be empty when the submitted io was executed and the completion event be
1290      * created.
1291      * @param flags     the flags that were part of the completion
1292      * @return          {@code true} if empty.
1293      */
1294     protected abstract boolean socketIsEmpty(int flags);
1295 
1296     abstract boolean isPollInFirst();
1297 }