View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufHolder;
21  import io.netty.buffer.ByteBufUtil;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractChannel;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelOption;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.ServerChannel;
38  import io.netty.channel.socket.ChannelInputShutdownEvent;
39  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40  import io.netty.channel.socket.SocketChannelConfig;
41  import io.netty.channel.unix.Buffer;
42  import io.netty.channel.unix.DomainSocketAddress;
43  import io.netty.channel.unix.Errors;
44  import io.netty.channel.unix.FileDescriptor;
45  import io.netty.channel.unix.UnixChannel;
46  import io.netty.channel.unix.UnixChannelUtil;
47  import io.netty.util.ReferenceCountUtil;
48  import io.netty.util.concurrent.PromiseNotifier;
49  import io.netty.util.internal.logging.InternalLogger;
50  import io.netty.util.internal.logging.InternalLoggerFactory;
51  
52  import java.io.IOException;
53  import java.net.InetSocketAddress;
54  import java.net.SocketAddress;
55  import java.nio.ByteBuffer;
56  import java.nio.channels.AlreadyConnectedException;
57  import java.nio.channels.ClosedChannelException;
58  import java.nio.channels.ConnectionPendingException;
59  import java.nio.channels.NotYetConnectedException;
60  import java.nio.channels.UnresolvedAddressException;
61  import java.util.concurrent.ScheduledFuture;
62  import java.util.concurrent.TimeUnit;
63  
64  import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
65  import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
66  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
67  import static io.netty.util.internal.ObjectUtil.checkNotNull;
68  
69  
70  abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
71      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
72      final LinuxSocket socket;
73      protected volatile boolean active;
74  
75      // Different masks for outstanding I/O operations.
76      private static final int POLL_IN_SCHEDULED = 1;
77      private static final int POLL_OUT_SCHEDULED = 1 << 2;
78      private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
79      private static final int WRITE_SCHEDULED = 1 << 4;
80      private static final int READ_SCHEDULED = 1 << 5;
81      private static final int CONNECT_SCHEDULED = 1 << 6;
82  
83      private short opsId = Short.MIN_VALUE;
84  
85      private long pollInId;
86      private long pollOutId;
87      private long pollRdhupId;
88      private long connectId;
89  
90      // A byte is enough for now.
91      private byte ioState;
92  
93      // It's possible that multiple read / writes are issued. We need to keep track of these.
94      // Let's limit the amount of pending writes and reads by Short.MAX_VALUE. Maybe Byte.MAX_VALUE would also be good
95      // enough but let's be a bit more flexible for now.
96      private short numOutstandingWrites;
97      // A value of -1 means that multi-shot is used and so reads will be issued as long as the request is not canceled.
98      private short numOutstandingReads;
99  
100     private boolean readPending;
101     private boolean inReadComplete;
102     private boolean socketHasMoreData;
103 
104     private static final class DelayedClose {
105         private final ChannelPromise promise;
106         private final Throwable cause;
107         private final ClosedChannelException closeCause;
108 
109         DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
110             this.promise = promise;
111             this.cause = cause;
112             this.closeCause = closeCause;
113         }
114     }
115     private DelayedClose delayedClose;
116     private boolean inputClosedSeenErrorOnRead;
117 
118     /**
119      * The future of the current connection attempt.  If not null, subsequent connection attempts will fail.
120      */
121     private ChannelPromise connectPromise;
122     private ScheduledFuture<?> connectTimeoutFuture;
123     private SocketAddress requestedRemoteAddress;
124     private ByteBuffer remoteAddressMemory;
125     private MsgHdrMemoryArray msgHdrMemoryArray;
126 
127     private IoRegistration registration;
128 
129     private volatile SocketAddress local;
130     private volatile SocketAddress remote;
131 
132     AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
133         super(parent);
134         this.socket = checkNotNull(socket, "fd");
135 
136         if (active) {
137             // Directly cache the remote and local addresses
138             // See https://github.com/netty/netty/issues/2359
139             this.active = true;
140             this.local = socket.localAddress();
141             this.remote = socket.remoteAddress();
142         }
143 
144         logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
145     }
146 
147     AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
148         super(parent);
149         this.socket = checkNotNull(fd, "fd");
150         this.active = true;
151 
152         // Directly cache the remote and local addresses
153         // See https://github.com/netty/netty/issues/2359
154         this.remote = remote;
155         this.local = fd.localAddress();
156     }
157 
158     // Called once a Channel changed from AUTO_READ=true to AUTO_READ=false
159     final void autoReadCleared() {
160         if (!isRegistered()) {
161             return;
162         }
163         readPending = false;
164         IoRegistration registration = this.registration;
165         if (registration == null || !registration.isValid()) {
166             return;
167         }
168         if (eventLoop().inEventLoop()) {
169             clearRead();
170         } else {
171             eventLoop().execute(this::clearRead);
172         }
173     }
174 
175     private void clearRead() {
176         assert eventLoop().inEventLoop();
177         readPending = false;
178         IoRegistration registration = this.registration;
179         if (registration == null || !registration.isValid()) {
180             return;
181         }
182         if ((ioState & POLL_IN_SCHEDULED) != 0) {
183             // There was a POLLIN scheduled, let's cancel it so we are not notified of any more reads for now.
184             assert pollInId != 0;
185             long id = registration.submit(
186                     IoUringIoOps.newAsyncCancel((byte) 0, pollInId, Native.IORING_OP_POLL_ADD));
187             assert id != 0;
188             ioState &= ~POLL_IN_SCHEDULED;
189         }
190         // Also cancel all outstanding reads as the user did signal there is no more desire to read.
191         cancelOutstandingReads(registration(), numOutstandingReads);
192     }
193 
194     /**
195      * Returns the next id that should be used when submitting {@link IoUringIoOps}.
196      *
197      * @return  opsId
198      */
199     protected final short nextOpsId() {
200         short id = opsId++;
201 
202         // We use 0 for "none".
203         if (id == 0) {
204             id = opsId++;
205         }
206         return id;
207     }
208 
209     public final boolean isOpen() {
210         return socket.isOpen();
211     }
212 
213     @Override
214     public boolean isActive() {
215         return active;
216     }
217 
218     @Override
219     public final FileDescriptor fd() {
220         return socket;
221     }
222 
223     private AbstractUringUnsafe ioUringUnsafe() {
224         return (AbstractUringUnsafe) unsafe();
225     }
226 
227     @Override
228     protected boolean isCompatible(final EventLoop loop) {
229         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
230     }
231 
232     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
233         return newDirectBuffer(buf, buf);
234     }
235 
236     protected boolean allowMultiShotPollIn() {
237         return IoUring.isPollAddMultishotEnabled();
238     }
239 
240     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
241         final int readableBytes = buf.readableBytes();
242         if (readableBytes == 0) {
243             ReferenceCountUtil.release(holder);
244             return Unpooled.EMPTY_BUFFER;
245         }
246 
247         final ByteBufAllocator alloc = alloc();
248         if (alloc.isDirectBufferPooled()) {
249             return newDirectBuffer0(holder, buf, alloc, readableBytes);
250         }
251 
252         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
253         if (directBuf == null) {
254             return newDirectBuffer0(holder, buf, alloc, readableBytes);
255         }
256 
257         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
258         ReferenceCountUtil.safeRelease(holder);
259         return directBuf;
260     }
261 
262     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
263         final ByteBuf directBuf = alloc.directBuffer(capacity);
264         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
265         ReferenceCountUtil.safeRelease(holder);
266         return directBuf;
267     }
268 
269     /**
270      * Cancel all outstanding reads
271      *
272      * @param registration          the {@link IoRegistration}.
273      * @param numOutstandingReads   the number of outstanding reads, or {@code -1} if multi-shot was used.
274      */
275     protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
276 
277     /**
278      * Cancel all outstanding writes
279      *
280      * @param registration          the {@link IoRegistration}.
281      * @param numOutstandingWrites  the number of outstanding writes.
282      */
283     protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
284 
285     @Override
286     protected void doDisconnect() throws Exception {
287     }
288 
289     private void freeRemoteAddressMemory() {
290         if (remoteAddressMemory != null) {
291             Buffer.free(remoteAddressMemory);
292             remoteAddressMemory = null;
293         }
294     }
295 
296     private void freeMsgHdrArray() {
297         if (msgHdrMemoryArray != null) {
298             msgHdrMemoryArray.release();
299             msgHdrMemoryArray = null;
300         }
301     }
302 
303     @Override
304     protected void doClose() throws Exception {
305         active = false;
306 
307         if (registration != null) {
308             if (socket.markClosed()) {
309                 int fd = fd().intValue();
310                 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
311                 registration.submit(ops);
312             }
313         } else {
314             // This one was never registered just use a syscall to close.
315             socket.close();
316             ioUringUnsafe().freeResourcesNowIfNeeded(null);
317         }
318     }
319 
320     @Override
321     protected final void doBeginRead() {
322         if (inputClosedSeenErrorOnRead) {
323             // We did see an error while reading and so closed the input. Stop reading.
324             return;
325         }
326         if (readPending) {
327             // We already have a read pending.
328             return;
329         }
330         readPending = true;
331         if (inReadComplete || !isActive()) {
332             // We are currently in the readComplete(...) callback which might issue more reads by itself.
333             // If readComplete(...) will not issue more reads itself it will pick up the readPending flag, reset it and
334             // call doBeginReadNow().
335             return;
336         }
337         doBeginReadNow();
338     }
339 
340     private void doBeginReadNow() {
341         if (inputClosedSeenErrorOnRead) {
342             // We did see an error while reading and so closed the input.
343             return;
344         }
345         if (!isPollInFirst() ||
346                 // If the socket was not empty, and we stopped reading we need to ensure we just force the
347                 // read as POLLIN might be edge-triggered (in case of POLL_ADD_MULTI).
348                 socketHasMoreData) {
349             // If the socket is blocking we will directly call scheduleFirstReadIfNeeded() as we can use FASTPOLL.
350             ioUringUnsafe().scheduleFirstReadIfNeeded();
351         } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
352             ioUringUnsafe().schedulePollIn();
353         }
354     }
355 
356     @Override
357     protected void doWrite(ChannelOutboundBuffer in) {
358         scheduleWriteIfNeeded(in, true);
359     }
360 
361     protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
362         if ((ioState & WRITE_SCHEDULED) != 0) {
363             return;
364         }
365         if (scheduleWrite(in) > 0) {
366             ioState |= WRITE_SCHEDULED;
367             if (submitAndRunNow && !isWritable()) {
368                 submitAndRunNow();
369             }
370         }
371     }
372 
373     protected void submitAndRunNow() {
374         // NOOP
375     }
376 
377     private int scheduleWrite(ChannelOutboundBuffer in) {
378         if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
379             return 0;
380         }
381         if (in == null) {
382             return 0;
383         }
384 
385         int msgCount = in.size();
386         if (msgCount == 0) {
387             return 0;
388         }
389         Object msg = in.current();
390 
391         if (msgCount > 1 && in.current() instanceof ByteBuf) {
392             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
393         } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
394                     (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
395             // We also need some special handling for CompositeByteBuf
396             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
397         } else {
398             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
399         }
400         // Ensure we never overflow
401         assert numOutstandingWrites > 0;
402         return numOutstandingWrites;
403     }
404 
405     protected final IoRegistration registration() {
406         assert registration != null;
407         return registration;
408     }
409 
410     private void schedulePollOut() {
411         pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
412     }
413 
414     final void schedulePollRdHup() {
415         pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
416     }
417 
418     private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
419         assert (ioState & ioMask) == 0;
420         int fd = fd().intValue();
421         IoRegistration registration = registration();
422         IoUringIoOps ops = IoUringIoOps.newPollAdd(
423                 fd, (byte) 0, mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
424         long id = registration.submit(ops);
425         if (id != 0) {
426             ioState |= (byte) ioMask;
427         }
428         return id;
429     }
430 
431     final void resetCachedAddresses() {
432         local = socket.localAddress();
433         remote = socket.remoteAddress();
434     }
435 
436     protected abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
437         private IoUringRecvByteAllocatorHandle allocHandle;
438         private boolean closed;
439         private boolean freed;
440         private boolean socketIsEmpty;
441 
442         /**
443          * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer} and returns the number of
444          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
445          */
446         protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
447 
448         /**
449          * Schedule the write of a single message and returns the number of
450          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
451          */
452         protected abstract int scheduleWriteSingle(Object msg);
453 
454         @Override
455         public final void handle(IoRegistration registration, IoEvent ioEvent) {
456             IoUringIoEvent event = (IoUringIoEvent) ioEvent;
457             byte op = event.opcode();
458             int res = event.res();
459             int flags = event.flags();
460             short data = event.data();
461             switch (op) {
462                 case Native.IORING_OP_RECV:
463                 case Native.IORING_OP_ACCEPT:
464                 case Native.IORING_OP_RECVMSG:
465                 case Native.IORING_OP_READ:
466                     readComplete(op, res, flags, data);
467                     break;
468                 case Native.IORING_OP_WRITEV:
469                 case Native.IORING_OP_SEND:
470                 case Native.IORING_OP_SENDMSG:
471                 case Native.IORING_OP_WRITE:
472                 case Native.IORING_OP_SPLICE:
473                     writeComplete(op, res, flags, data);
474                     break;
475                 case Native.IORING_OP_POLL_ADD:
476                     pollAddComplete(res, flags, data);
477                     break;
478                 case Native.IORING_OP_ASYNC_CANCEL:
479                     cancelComplete0(op, res, flags, data);
480                     break;
481                 case Native.IORING_OP_CONNECT:
482                     connectComplete(op, res, flags, data);
483 
484                     // once the connect was completed we can also free some resources that are not needed anymore.
485                     freeMsgHdrArray();
486                     freeRemoteAddressMemory();
487                     break;
488                 case Native.IORING_OP_CLOSE:
489                     if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
490                         if (delayedClose != null) {
491                             delayedClose.promise.setSuccess();
492                         }
493                         closed = true;
494                     }
495                     break;
496                 default:
497                     break;
498             }
499 
500             // We delay the actual close if there is still a write or read scheduled, let's see if there
501             // was a close that needs to be done now.
502             handleDelayedClosed();
503 
504             if (ioState == 0 && closed) {
505                 freeResourcesNowIfNeeded(registration);
506             }
507         }
508 
509         private void freeResourcesNowIfNeeded(IoRegistration reg) {
510             if (!freed) {
511                 freed = true;
512                 freeResourcesNow(reg);
513             }
514         }
515 
516         /**
517          * Free all resources now. No new IO will be submitted for this channel via io_uring
518          *
519          * @param reg   the {@link IoRegistration} or {@code null} if it was never registered
520          */
521         protected void freeResourcesNow(IoRegistration reg) {
522             freeMsgHdrArray();
523             freeRemoteAddressMemory();
524             if (reg != null) {
525                 reg.cancel();
526             }
527         }
528 
529         private void handleDelayedClosed() {
530             if (delayedClose != null && canCloseNow()) {
531                 closeNow();
532             }
533         }
534 
535         private void pollAddComplete(int res, int flags, short data) {
536             if ((res & Native.POLLOUT) != 0) {
537                 pollOut(res);
538             }
539             if ((res & Native.POLLIN) != 0) {
540                 pollIn(res, flags, data);
541             }
542             if ((res & Native.POLLRDHUP) != 0) {
543                 pollRdHup(res);
544             }
545         }
546 
547         @Override
548         public final void close() throws Exception {
549             close(voidPromise());
550         }
551 
552         @Override
553         protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
554             if (closeFuture().isDone()) {
555                 // Closed already before.
556                 safeSetSuccess(promise);
557                 return;
558             }
559             if (delayedClose == null) {
560                 // We have a write operation pending that should be completed asap.
561                 // We will do the actual close operation one this write result is returned as otherwise
562                 // we may get into trouble as we may close the fd while we did not process the write yet.
563                 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
564             } else {
565                 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
566                 return;
567             }
568 
569             boolean cancelConnect = false;
570             try {
571                 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
572                 if (connectPromise != null) {
573                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
574                     connectPromise.tryFailure(new ClosedChannelException());
575                     AbstractIoUringChannel.this.connectPromise = null;
576                     cancelConnect = true;
577                 }
578 
579                 cancelConnectTimeoutFuture();
580             } finally {
581                 // It's important we cancel all outstanding connect, write and read operations now so
582                 // we will be able to process a delayed close if needed.
583                 cancelOps(cancelConnect);
584             }
585 
586             if (canCloseNow()) {
587                 // Currently there are is no WRITE and READ scheduled so we can start to teardown the channel.
588                 closeNow();
589             }
590         }
591 
592         private void cancelOps(boolean cancelConnect) {
593             if (registration == null || !registration.isValid()) {
594                 return;
595             }
596             byte flags = (byte) 0;
597             if ((ioState & POLL_RDHUP_SCHEDULED) != 0 && pollRdhupId != 0) {
598                 long id = registration.submit(
599                         IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
600                 assert id != 0;
601                 pollRdhupId = 0;
602             }
603             if ((ioState & POLL_IN_SCHEDULED) != 0 && pollInId != 0) {
604                 long id = registration.submit(
605                         IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
606                 assert id != 0;
607                 pollInId = 0;
608             }
609             if ((ioState & POLL_OUT_SCHEDULED) != 0 && pollOutId != 0) {
610                 long id = registration.submit(
611                         IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
612                 assert id != 0;
613                 pollOutId = 0;
614             }
615             if (cancelConnect && connectId != 0) {
616                 // Best effort to cancel the already submitted connect request.
617                 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
618                 assert id != 0;
619                 connectId = 0;
620             }
621             cancelOutstandingReads(registration, numOutstandingReads);
622             cancelOutstandingWrites(registration, numOutstandingWrites);
623         }
624 
625         private boolean canCloseNow() {
626             // Currently there are is no WRITE and READ scheduled, we can close the channel now without
627             // problems related to re-ordering of completions.
628             return (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
629         }
630 
631         private void closeNow() {
632             super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
633         }
634 
635         @Override
636         protected final void flush0() {
637             // Flush immediately only when there's no pending flush.
638             // If there's a pending flush operation, event loop will call forceFlush() later,
639             // and thus there's no need to call it now.
640             if ((ioState & POLL_OUT_SCHEDULED) == 0) {
641                 super.flush0();
642             }
643         }
644 
645         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
646             if (promise == null) {
647                 // Closed via cancellation and the promise has been notified already.
648                 return;
649             }
650 
651             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
652             promise.tryFailure(cause);
653             closeIfClosed();
654         }
655 
656         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
657             if (promise == null) {
658                 // Closed via cancellation and the promise has been notified already.
659                 return;
660             }
661             active = true;
662 
663             if (local == null) {
664                 local = socket.localAddress();
665             }
666             computeRemote();
667 
668             // Register POLLRDHUP
669             schedulePollRdHup();
670 
671             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
672             // We still need to ensure we call fireChannelActive() in this case.
673             boolean active = isActive();
674 
675             // trySuccess() will return false if a user cancelled the connection attempt.
676             boolean promiseSet = promise.trySuccess();
677 
678             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
679             // because what happened is what happened.
680             if (!wasActive && active) {
681                 pipeline().fireChannelActive();
682             }
683 
684             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
685             if (!promiseSet) {
686                 close(voidPromise());
687             }
688         }
689 
690         @Override
691         public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
692             if (allocHandle == null) {
693                 allocHandle = new IoUringRecvByteAllocatorHandle(
694                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
695             }
696             return allocHandle;
697         }
698 
699         final void shutdownInput(boolean allDataRead) {
700             logger.trace("shutdownInput Fd: {}", fd().intValue());
701             if (!socket.isInputShutdown()) {
702                 if (isAllowHalfClosure(config())) {
703                     try {
704                         socket.shutdown(true, false);
705                     } catch (IOException ignored) {
706                         // We attempted to shutdown and failed, which means the input has already effectively been
707                         // shutdown.
708                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
709                         return;
710                     } catch (NotYetConnectedException ignore) {
711                         // We attempted to shutdown and failed, which means the input has already effectively been
712                         // shutdown.
713                     }
714                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
715                 } else {
716                     // Handle this same way as if we did read all data so we don't schedule another read.
717                     inputClosedSeenErrorOnRead = true;
718                     close(voidPromise());
719                     return;
720                 }
721             }
722             if (allDataRead && !inputClosedSeenErrorOnRead) {
723                 inputClosedSeenErrorOnRead = true;
724                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
725             }
726         }
727 
728         private void fireEventAndClose(Object evt) {
729             pipeline().fireUserEventTriggered(evt);
730             close(voidPromise());
731         }
732 
733         final void schedulePollIn() {
734             assert (ioState & POLL_IN_SCHEDULED) == 0;
735             if (!isActive() || shouldBreakIoUringInReady(config())) {
736                 return;
737             }
738             pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, allowMultiShotPollIn());
739         }
740 
741         private void readComplete(byte op, int res, int flags, short data) {
742             assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
743 
744             boolean multishot = numOutstandingReads == -1;
745             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
746             if (rearm) {
747                 // Reset READ_SCHEDULED if there is nothing more to handle and so we need to re-arm. This works for
748                 // multi-shot and non multi-shot variants.
749                 ioState &= ~READ_SCHEDULED;
750             }
751             boolean pending = readPending;
752             if (multishot) {
753                 // Reset readPending so we can still keep track if we might need to cancel the multi-shot read or
754                 // not.
755                 readPending = false;
756             } else if (--numOutstandingReads == 0) {
757                 // We received all outstanding completions.
758                 readPending = false;
759                 ioState &= ~READ_SCHEDULED;
760             }
761             inReadComplete = true;
762             try {
763                 socketIsEmpty = socketIsEmpty(flags);
764                 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
765                         (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
766                 readComplete0(op, res, flags, data, numOutstandingReads);
767             } finally {
768                 try {
769                     // Check if we should consider the read loop to be done.
770                     if (recvBufAllocHandle().isReadComplete()) {
771                         // Reset the handle as we are done with the read-loop.
772                         recvBufAllocHandle().reset(config());
773 
774                         // Check if this was a readComplete(...) triggered by a read or multi-shot read.
775                         if (!multishot) {
776                             if (readPending) {
777                                 // This was a "normal" read and the user did signal we should continue reading.
778                                 // Let's schedule the read now.
779                                 doBeginReadNow();
780                             }
781                         } else {
782                             // The readComplete(...) was triggered by a multi-shot read. Because of this the state
783                             // machine is a bit more complicated.
784 
785                             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
786                                 // The readComplete(...) was triggered because the previous read was cancelled.
787                                 // In this case we we need to check if the user did signal the desire to read again
788                                 // in the meantime. If this is the case we need to schedule the read to ensure
789                                 // we do not stall.
790                                 if (pending) {
791                                     doBeginReadNow();
792                                 }
793                             } else if (rearm) {
794                                 // We need to rearm the multishot as otherwise we might miss some data.
795                                 doBeginReadNow();
796                             } else if (!readPending) {
797                                 // Cancel the multi-shot read now as the user did not signal that we want to keep
798                                 // reading while we handle the completion event.
799                                 cancelOutstandingReads(registration, numOutstandingReads);
800                             }
801                         }
802                     } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
803                         // The readComplete(...) was triggered because the previous read was cancelled.
804                         // In this case we we need to check if the user did signal the desire to read again
805                         // in the meantime. If this is the case we need to schedule the read to ensure
806                         // we do not stall.
807                         if (pending) {
808                             doBeginReadNow();
809                         }
810                     } else if (multishot && rearm) {
811                         // We need to rearm the multishot as otherwise we might miss some data.
812                         doBeginReadNow();
813                     }
814                 } finally {
815                     inReadComplete = false;
816                     socketIsEmpty = false;
817                 }
818             }
819         }
820 
821         /**
822          * Called once a read was completed.
823          */
824         protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
825 
826         /**
827          * Called once POLLRDHUP event is ready to be processed
828          */
829         private void pollRdHup(int res) {
830             ioState &= ~POLL_RDHUP_SCHEDULED;
831             pollRdhupId = 0;
832             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
833                 return;
834             }
835 
836             // Mark that we received a POLLRDHUP and so need to continue reading until all the input ist drained.
837             recvBufAllocHandle().rdHupReceived();
838 
839             if (isActive()) {
840                 scheduleFirstReadIfNeeded();
841             } else {
842                 // Just to be safe make sure the input marked as closed.
843                 shutdownInput(false);
844             }
845         }
846 
847         /**
848          * Called once POLLIN event is ready to be processed
849          */
850         private void pollIn(int res, int flags, short data) {
851             // Check if we need to rearm. This works for both cases, POLL_ADD and POLL_ADD_MULTI.
852             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
853             if (rearm) {
854                 ioState &= ~POLL_IN_SCHEDULED;
855                 pollInId = 0;
856             }
857             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
858                 return;
859             }
860             if (!readPending) {
861                 // We received the POLLIN but the user is not interested yet in reading, just mark socketHasMoreData
862                 // as true so we will trigger a read directly once the user calls read()
863                 socketHasMoreData = true;
864                 return;
865             }
866             scheduleFirstReadIfNeeded();
867         }
868 
869         private void scheduleFirstReadIfNeeded() {
870             if ((ioState & READ_SCHEDULED) == 0) {
871                 scheduleFirstRead();
872             }
873         }
874 
875         private void scheduleFirstRead() {
876             // This is a new "read loop" so we need to reset the allocHandle.
877             final ChannelConfig config = config();
878             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
879             allocHandle.reset(config);
880             scheduleRead(true);
881         }
882 
883         protected final void scheduleRead(boolean first) {
884             // Only schedule another read if the fd is still open.
885             if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
886                 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
887                 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
888                     ioState |= READ_SCHEDULED;
889                 }
890             }
891         }
892 
893         /**
894          * Schedule a read and returns the number of {@link #readComplete(byte, int, int, short)}
895          * calls that are expected because of the scheduled read.
896          *
897          * @param first             {@code true} if this is the first read of a read loop.
898          * @param socketIsEmpty     {@code true} if the socket is guaranteed to be empty, {@code false} otherwise.
899          * @return                  the number of {@link #readComplete(byte, int, int, short)} calls expected or
900          *                          {@code -1} if {@link #readComplete(byte, int, int, short)} is called until
901          *                          the read is cancelled (multi-shot).
902          */
903         protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
904 
905         /**
906          * Called once POLLOUT event is ready to be processed
907          *
908          * @param res   the result.
909          */
910         private void pollOut(int res) {
911             ioState &= ~POLL_OUT_SCHEDULED;
912             pollOutId = 0;
913             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
914                 return;
915             }
916             // pending connect
917             if (connectPromise != null) {
918                 // Note this method is invoked by the event loop only if the connection attempt was
919                 // neither cancelled nor timed out.
920 
921                 assert eventLoop().inEventLoop();
922 
923                 boolean connectStillInProgress = false;
924                 try {
925                     boolean wasActive = isActive();
926                     if (!socket.finishConnect()) {
927                         connectStillInProgress = true;
928                         return;
929                     }
930                     fulfillConnectPromise(connectPromise, wasActive);
931                 } catch (Throwable t) {
932                     fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
933                 } finally {
934                     if (!connectStillInProgress) {
935                         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0
936                         // is used
937                         // See https://github.com/netty/netty/issues/1770
938                         cancelConnectTimeoutFuture();
939                         connectPromise = null;
940                     } else {
941                         // The connect was not done yet, register for POLLOUT again
942                         schedulePollOut();
943                     }
944                 }
945             } else if (!socket.isOutputShutdown()) {
946                 // Try writing again
947                 super.flush0();
948             }
949         }
950 
951         /**
952          * Called once a write was completed.
953          *
954          * @param op    the op code.
955          * @param res   the result.
956          * @param flags the flags.
957          * @param data  the data that was passed when submitting the op.
958          */
959         private void writeComplete(byte op, int res, int flags, short data) {
960             if ((ioState & CONNECT_SCHEDULED) != 0) {
961                 // The writeComplete(...) callback was called because of a sendmsg(...) result that was used for
962                 // TCP_FASTOPEN_CONNECT.
963                 freeMsgHdrArray();
964                 if (res > 0) {
965                     // Connect complete!
966                     outboundBuffer().removeBytes(res);
967 
968                     // Explicit pass in 0 as this is returned by a connect(...) call when it was successful.
969                     connectComplete(op, 0, flags, data);
970                 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
971                     // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
972                     // In this case, our TCP connection will be established normally, but no data was transmitted at
973                     // this time. We'll just transmit the data with normal writes later.
974                     // Let's submit a normal connect.
975                     submitConnect((InetSocketAddress) requestedRemoteAddress);
976                 } else {
977                     // There was an error, handle it as a normal connect error.
978                     connectComplete(op, res, flags, data);
979                 }
980                 return;
981             }
982             assert numOutstandingWrites > 0;
983             --numOutstandingWrites;
984 
985             boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
986             if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
987 
988                 // We were not able to write everything, let's register for POLLOUT
989                 schedulePollOut();
990             }
991 
992             // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
993             // while still removing messages internally in removeBytes(...) which then may corrupt state.
994             if (numOutstandingWrites == 0) {
995                 ioState &= ~WRITE_SCHEDULED;
996 
997                 // If we could write all and we did not schedule a pollout yet let us try to write again
998                 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
999                     scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
1000                 }
1001             }
1002         }
1003 
1004         /**
1005          * Called once a write was completed.
1006          * @param op            the op code
1007          * @param res           the result.
1008          * @param flags         the flags.
1009          * @param data          the data that was passed when submitting the op.
1010          * @param outstanding   the outstanding write completions.
1011          */
1012         abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1013 
1014         /**
1015          * Called once a cancel was completed.
1016          *
1017          * @param op            the op code
1018          * @param res           the result.
1019          * @param flags         the flags.
1020          * @param data          the data that was passed when submitting the op.
1021          */
1022         void cancelComplete0(byte op, int res, int flags, short data) {
1023             // NOOP
1024         }
1025 
1026         /**
1027          * Called once a connect was completed.
1028          * @param op            the op code.
1029          * @param res           the result.
1030          * @param flags         the flags.
1031          * @param data          the data that was passed when submitting the op.
1032          */
1033         void connectComplete(byte op, int res, int flags, short data) {
1034             ioState &= ~CONNECT_SCHEDULED;
1035             freeRemoteAddressMemory();
1036 
1037             if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1038                 // connect not complete yet need to wait for poll_out event
1039                 schedulePollOut();
1040             } else {
1041                 try {
1042                     if (res == 0) {
1043                         fulfillConnectPromise(connectPromise, active);
1044                         if (readPending) {
1045                             doBeginReadNow();
1046                         }
1047                     } else {
1048                         try {
1049                             Errors.throwConnectException("io_uring connect", res);
1050                         } catch (Throwable cause) {
1051                             fulfillConnectPromise(connectPromise, cause);
1052                         }
1053                     }
1054                 } finally {
1055                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is
1056                     // used
1057                     // See https://github.com/netty/netty/issues/1770
1058                     cancelConnectTimeoutFuture();
1059                     connectPromise = null;
1060                 }
1061             }
1062         }
1063 
1064         @Override
1065         public void connect(
1066                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1067             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
1068             // non-blocking io.
1069             if (promise.isDone() || !ensureOpen(promise)) {
1070                 return;
1071             }
1072 
1073             if (delayedClose != null) {
1074                 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1075                 return;
1076             }
1077             try {
1078                 if (connectPromise != null) {
1079                     throw new ConnectionPendingException();
1080                 }
1081                 if (localAddress instanceof InetSocketAddress) {
1082                     checkResolvable((InetSocketAddress) localAddress);
1083                 }
1084 
1085                 if (remoteAddress instanceof InetSocketAddress) {
1086                     checkResolvable((InetSocketAddress) remoteAddress);
1087                 }
1088 
1089                 if (remote != null) {
1090                     // Check if already connected before trying to connect. This is needed as connect(...) will not#
1091                     // return -1 and set errno to EISCONN if a previous connect(...) attempt was setting errno to
1092                     // EINPROGRESS and finished later.
1093                     throw new AlreadyConnectedException();
1094                 }
1095 
1096                 if (localAddress != null) {
1097                     socket.bind(localAddress);
1098                 }
1099 
1100                 if (remoteAddress instanceof InetSocketAddress) {
1101                     InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1102                     ByteBuf initialData = null;
1103                     if (IoUring.isTcpFastOpenClientSideAvailable() &&
1104                         config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1105                         ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1106                         outbound.addFlush();
1107                         Object curr;
1108                         if ((curr = outbound.current()) instanceof ByteBuf) {
1109                             initialData = (ByteBuf) curr;
1110                         }
1111                     }
1112                     if (initialData != null) {
1113                         msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1114                         MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1115                         hdr.set(socket, inetSocketAddress, IoUring.memoryAddress(initialData),
1116                                 initialData.readableBytes(), (short) 0);
1117 
1118                         int fd = fd().intValue();
1119                         IoRegistration registration = registration();
1120                         IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
1121                                 hdr.address(), hdr.idx());
1122                         connectId = registration.submit(ops);
1123                         if (connectId == 0) {
1124                             // Directly release the memory if submitting failed.
1125                             freeMsgHdrArray();
1126                         }
1127                     } else {
1128                         submitConnect(inetSocketAddress);
1129                     }
1130                 } else if (remoteAddress instanceof DomainSocketAddress) {
1131                     DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) remoteAddress;
1132                     submitConnect(unixDomainSocketAddress);
1133                 } else {
1134                     throw new Error("Unexpected SocketAddress implementation " + remoteAddress);
1135                 }
1136 
1137                 if (connectId != 0) {
1138                     ioState |= CONNECT_SCHEDULED;
1139                 }
1140             } catch (Throwable t) {
1141                 closeIfClosed();
1142                 promise.tryFailure(annotateConnectException(t, remoteAddress));
1143                 return;
1144             }
1145             connectPromise = promise;
1146             requestedRemoteAddress = remoteAddress;
1147             // Schedule connect timeout.
1148             int connectTimeoutMillis = config().getConnectTimeoutMillis();
1149             if (connectTimeoutMillis > 0) {
1150                 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1151                     @Override
1152                     public void run() {
1153                         ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1154                         if (connectPromise != null && !connectPromise.isDone() &&
1155                                 connectPromise.tryFailure(new ConnectTimeoutException(
1156                                         "connection timed out: " + remoteAddress))) {
1157                             close(voidPromise());
1158                         }
1159                     }
1160                 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1161             }
1162 
1163             promise.addListener(new ChannelFutureListener() {
1164                 @Override
1165                 public void operationComplete(ChannelFuture future) {
1166                     // If the connect future is cancelled we also cancel the timeout and close the
1167                     // underlying socket.
1168                     if (future.isCancelled()) {
1169                         cancelConnectTimeoutFuture();
1170                         connectPromise = null;
1171                         close(voidPromise());
1172                     }
1173                 }
1174             });
1175         }
1176     }
1177 
1178     private void submitConnect(InetSocketAddress inetSocketAddress) {
1179         remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1180 
1181         SockaddrIn.set(socket.isIpv6(), remoteAddressMemory, inetSocketAddress);
1182 
1183         int fd = fd().intValue();
1184         IoRegistration registration = registration();
1185         IoUringIoOps ops = IoUringIoOps.newConnect(
1186                 fd, (byte) 0, Buffer.memoryAddress(remoteAddressMemory), nextOpsId());
1187         connectId = registration.submit(ops);
1188         if (connectId == 0) {
1189             // Directly release the memory if submitting failed.
1190             freeRemoteAddressMemory();
1191         }
1192     }
1193 
1194     private void submitConnect(DomainSocketAddress unixDomainSocketAddress) {
1195         remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_UN);
1196         SockaddrIn.setUds(remoteAddressMemory, unixDomainSocketAddress);
1197         int fd = fd().intValue();
1198         IoRegistration registration = registration();
1199         long addr = Buffer.memoryAddress(remoteAddressMemory);
1200         IoUringIoOps ops = IoUringIoOps.newConnect(fd, (byte) 0, addr, Native.SIZEOF_SOCKADDR_UN, nextOpsId());
1201         connectId = registration.submit(ops);
1202         if (connectId == 0) {
1203             // Directly release the memory if submitting failed.
1204             freeRemoteAddressMemory();
1205         }
1206     }
1207 
1208     @Override
1209     protected Object filterOutboundMessage(Object msg) {
1210         if (msg instanceof ByteBuf) {
1211             ByteBuf buf = (ByteBuf) msg;
1212             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1213         }
1214         throw new UnsupportedOperationException("unsupported message type");
1215     }
1216 
1217     @Override
1218     protected void doRegister(ChannelPromise promise) {
1219         IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1220         eventLoop.register(ioUringUnsafe()).addListener(f -> {
1221             if (f.isSuccess()) {
1222                 registration = (IoRegistration) f.getNow();
1223                 promise.setSuccess();
1224             } else {
1225                 promise.setFailure(f.cause());
1226             }
1227         });
1228     }
1229 
1230     @Override
1231     protected final void doDeregister() {
1232         // Cancel all previous submitted ops.
1233         ioUringUnsafe().cancelOps(connectPromise != null);
1234     }
1235 
1236     @Override
1237     protected void doBind(final SocketAddress local) throws Exception {
1238         if (local instanceof InetSocketAddress) {
1239             checkResolvable((InetSocketAddress) local);
1240         }
1241         socket.bind(local);
1242         this.local = socket.localAddress();
1243     }
1244 
1245     protected static void checkResolvable(InetSocketAddress addr) {
1246         if (addr.isUnresolved()) {
1247             throw new UnresolvedAddressException();
1248         }
1249     }
1250 
1251     @Override
1252     protected final SocketAddress localAddress0() {
1253         return local;
1254     }
1255 
1256     @Override
1257     protected final SocketAddress remoteAddress0() {
1258         return remote;
1259     }
1260 
1261     private static boolean isAllowHalfClosure(ChannelConfig config) {
1262         return config instanceof SocketChannelConfig &&
1263                ((SocketChannelConfig) config).isAllowHalfClosure();
1264     }
1265 
1266     private void cancelConnectTimeoutFuture() {
1267         if (connectTimeoutFuture != null) {
1268             connectTimeoutFuture.cancel(false);
1269             connectTimeoutFuture = null;
1270         }
1271     }
1272 
1273     private void computeRemote() {
1274         if (requestedRemoteAddress instanceof InetSocketAddress) {
1275             remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1276         }
1277     }
1278 
1279     private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1280         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1281     }
1282 
1283     /**
1284      * Return if the socket is guaranteed to be empty when the submitted io was executed and the completion event be
1285      * created.
1286      * @param flags     the flags that were part of the completion
1287      * @return          {@code true} if empty.
1288      */
1289     protected abstract boolean socketIsEmpty(int flags);
1290 
1291     abstract boolean isPollInFirst();
1292 }