View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufHolder;
21  import io.netty.buffer.ByteBufUtil;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractChannel;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelOption;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.ServerChannel;
38  import io.netty.channel.socket.ChannelInputShutdownEvent;
39  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40  import io.netty.channel.socket.SocketChannelConfig;
41  import io.netty.channel.unix.Buffer;
42  import io.netty.channel.unix.Errors;
43  import io.netty.channel.unix.FileDescriptor;
44  import io.netty.channel.unix.UnixChannel;
45  import io.netty.channel.unix.UnixChannelUtil;
46  import io.netty.util.ReferenceCountUtil;
47  import io.netty.util.concurrent.PromiseNotifier;
48  import io.netty.util.internal.logging.InternalLogger;
49  import io.netty.util.internal.logging.InternalLoggerFactory;
50  
51  import java.io.IOException;
52  import java.net.InetSocketAddress;
53  import java.net.SocketAddress;
54  import java.nio.ByteBuffer;
55  import java.nio.channels.AlreadyConnectedException;
56  import java.nio.channels.ClosedChannelException;
57  import java.nio.channels.ConnectionPendingException;
58  import java.nio.channels.NotYetConnectedException;
59  import java.nio.channels.UnresolvedAddressException;
60  import java.util.concurrent.ScheduledFuture;
61  import java.util.concurrent.TimeUnit;
62  
63  import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
64  import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
65  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
66  import static io.netty.util.internal.ObjectUtil.checkNotNull;
67  
68  
69  abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
70      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
71      final LinuxSocket socket;
72      protected volatile boolean active;
73  
74      // Different masks for outstanding I/O operations.
75      private static final int POLL_IN_SCHEDULED = 1;
76      private static final int POLL_OUT_SCHEDULED = 1 << 2;
77      private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
78      private static final int WRITE_SCHEDULED = 1 << 4;
79      private static final int READ_SCHEDULED = 1 << 5;
80      private static final int CONNECT_SCHEDULED = 1 << 6;
81  
82      private short opsId = Short.MIN_VALUE;
83  
84      private long pollInId;
85      private long pollOutId;
86      private long pollRdhupId;
87      private long connectId;
88  
89      // A byte is enough for now.
90      private byte ioState;
91  
92      // It's possible that multiple read / writes are issued. We need to keep track of these.
93      // Let's limit the amount of pending writes and reads by Short.MAX_VALUE. Maybe Byte.MAX_VALUE would also be good
94      // enough but let's be a bit more flexible for now.
95      private short numOutstandingWrites;
96      // A value of -1 means that multi-shot is used and so reads will be issued as long as the request is not canceled.
97      private short numOutstandingReads;
98  
99      private boolean readPending;
100     private boolean inReadComplete;
101     private boolean socketHasMoreData;
102 
103     private static final class DelayedClose {
104         private final ChannelPromise promise;
105         private final Throwable cause;
106         private final ClosedChannelException closeCause;
107 
108         DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
109             this.promise = promise;
110             this.cause = cause;
111             this.closeCause = closeCause;
112         }
113     }
114     private DelayedClose delayedClose;
115     private boolean inputClosedSeenErrorOnRead;
116 
117     /**
118      * The future of the current connection attempt.  If not null, subsequent connection attempts will fail.
119      */
120     private ChannelPromise connectPromise;
121     private ScheduledFuture<?> connectTimeoutFuture;
122     private SocketAddress requestedRemoteAddress;
123     private ByteBuffer remoteAddressMemory;
124     private MsgHdrMemoryArray msgHdrMemoryArray;
125 
126     private IoRegistration registration;
127 
128     private volatile SocketAddress local;
129     private volatile SocketAddress remote;
130 
131     AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
132         super(parent);
133         this.socket = checkNotNull(socket, "fd");
134 
135         if (active) {
136             // Directly cache the remote and local addresses
137             // See https://github.com/netty/netty/issues/2359
138             this.active = true;
139             this.local = socket.localAddress();
140             this.remote = socket.remoteAddress();
141         }
142 
143         logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
144     }
145 
146     AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
147         super(parent);
148         this.socket = checkNotNull(fd, "fd");
149         this.active = true;
150 
151         // Directly cache the remote and local addresses
152         // See https://github.com/netty/netty/issues/2359
153         this.remote = remote;
154         this.local = fd.localAddress();
155     }
156 
157     // Called once a Channel changed from AUTO_READ=true to AUTO_READ=false
158     final void autoReadCleared() {
159         if (!isRegistered()) {
160             return;
161         }
162         readPending = false;
163         IoRegistration registration = this.registration;
164         if (registration == null || !registration.isValid()) {
165             return;
166         }
167         if (eventLoop().inEventLoop()) {
168             clearRead();
169         } else {
170             eventLoop().execute(this::clearRead);
171         }
172     }
173 
174     private void clearRead() {
175         assert eventLoop().inEventLoop();
176         readPending = false;
177         IoRegistration registration = this.registration;
178         if (registration == null || !registration.isValid()) {
179             return;
180         }
181         if ((ioState & POLL_IN_SCHEDULED) != 0) {
182             // There was a POLLIN scheduled, let's cancel it so we are not notified of any more reads for now.
183             assert pollInId != 0;
184             long id = registration.submit(
185                     IoUringIoOps.newAsyncCancel((byte) 0, pollInId, Native.IORING_OP_POLL_ADD));
186             assert id != 0;
187         }
188         // Also cancel all outstanding reads as the user did signal there is no more desire to read.
189         cancelOutstandingReads(registration(), numOutstandingReads);
190     }
191 
192     /**
193      * Returns the next id that should be used when submitting {@link IoUringIoOps}.
194      *
195      * @return  opsId
196      */
197     protected final short nextOpsId() {
198         short id = opsId++;
199 
200         // We use 0 for "none".
201         if (id == 0) {
202             id = opsId++;
203         }
204         return id;
205     }
206 
207     public final boolean isOpen() {
208         return socket.isOpen();
209     }
210 
211     @Override
212     public boolean isActive() {
213         return active;
214     }
215 
216     @Override
217     public final FileDescriptor fd() {
218         return socket;
219     }
220 
221     private AbstractUringUnsafe ioUringUnsafe() {
222         return (AbstractUringUnsafe) unsafe();
223     }
224 
225     @Override
226     protected boolean isCompatible(final EventLoop loop) {
227         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
228     }
229 
230     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
231         return newDirectBuffer(buf, buf);
232     }
233 
234     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
235         final int readableBytes = buf.readableBytes();
236         if (readableBytes == 0) {
237             ReferenceCountUtil.release(holder);
238             return Unpooled.EMPTY_BUFFER;
239         }
240 
241         final ByteBufAllocator alloc = alloc();
242         if (alloc.isDirectBufferPooled()) {
243             return newDirectBuffer0(holder, buf, alloc, readableBytes);
244         }
245 
246         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
247         if (directBuf == null) {
248             return newDirectBuffer0(holder, buf, alloc, readableBytes);
249         }
250 
251         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
252         ReferenceCountUtil.safeRelease(holder);
253         return directBuf;
254     }
255 
256     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
257         final ByteBuf directBuf = alloc.directBuffer(capacity);
258         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
259         ReferenceCountUtil.safeRelease(holder);
260         return directBuf;
261     }
262 
263     /**
264      * Cancel all outstanding reads
265      *
266      * @param registration          the {@link IoRegistration}.
267      * @param numOutstandingReads   the number of outstanding reads, or {@code -1} if multi-shot was used.
268      */
269     protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
270 
271     /**
272      * Cancel all outstanding writes
273      *
274      * @param registration          the {@link IoRegistration}.
275      * @param numOutstandingWrites  the number of outstanding writes.
276      */
277     protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
278 
279     @Override
280     protected void doDisconnect() throws Exception {
281     }
282 
283     protected byte flags(byte flags) {
284         return flags;
285     }
286 
287     private void freeRemoteAddressMemory() {
288         if (remoteAddressMemory != null) {
289             Buffer.free(remoteAddressMemory);
290             remoteAddressMemory = null;
291         }
292     }
293 
294     private void freeMsgHdrArray() {
295         if (msgHdrMemoryArray != null) {
296             msgHdrMemoryArray.release();
297             msgHdrMemoryArray = null;
298         }
299     }
300 
301     @Override
302     protected void doClose() throws Exception {
303         active = false;
304 
305         if (registration != null) {
306             if (socket.markClosed()) {
307                 int fd = fd().intValue();
308                 IoUringIoOps ops = IoUringIoOps.newClose(fd, flags((byte) 0), nextOpsId());
309                 registration.submit(ops);
310             }
311         } else {
312             // This one was never registered just use a syscall to close.
313             socket.close();
314             ioUringUnsafe().freeResourcesNowIfNeeded(null);
315         }
316     }
317 
318     @Override
319     protected final void doBeginRead() {
320         if (inputClosedSeenErrorOnRead) {
321             // We did see an error while reading and so closed the input. Stop reading.
322             return;
323         }
324         if (readPending) {
325             // We already have a read pending.
326             return;
327         }
328         readPending = true;
329         if (inReadComplete || !isActive()) {
330             // We are currently in the readComplete(...) callback which might issue more reads by itself.
331             // If readComplete(...) will not issue more reads itself it will pick up the readPending flag, reset it and
332             // call doBeginReadNow().
333             return;
334         }
335         doBeginReadNow();
336     }
337 
338     private void doBeginReadNow() {
339         if (inputClosedSeenErrorOnRead) {
340             // We did see an error while reading and so closed the input.
341             return;
342         }
343         if (!isPollInFirst() ||
344                 // If the socket was not empty, and we stopped reading we need to ensure we just force the
345                 // read as POLLIN might be edge-triggered (in case of POLL_ADD_MULTI).
346                 socketHasMoreData) {
347             // If the socket is blocking we will directly call scheduleFirstReadIfNeeded() as we can use FASTPOLL.
348             ioUringUnsafe().scheduleFirstReadIfNeeded();
349         } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
350             ioUringUnsafe().schedulePollIn();
351         }
352     }
353 
354     @Override
355     protected void doWrite(ChannelOutboundBuffer in) {
356         scheduleWriteIfNeeded(in, true);
357     }
358 
359     protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
360         if ((ioState & WRITE_SCHEDULED) != 0) {
361             return;
362         }
363         if (scheduleWrite(in) > 0) {
364             ioState |= WRITE_SCHEDULED;
365             if (submitAndRunNow && !isWritable()) {
366                 submitAndRunNow();
367             }
368         }
369     }
370 
371     protected void submitAndRunNow() {
372         // NOOP
373     }
374 
375     private int scheduleWrite(ChannelOutboundBuffer in) {
376         if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
377             return 0;
378         }
379         if (in == null) {
380             return 0;
381         }
382 
383         int msgCount = in.size();
384         if (msgCount == 0) {
385             return 0;
386         }
387         Object msg = in.current();
388 
389         if (msgCount > 1 && in.current() instanceof ByteBuf) {
390             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
391         } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
392                     (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
393             // We also need some special handling for CompositeByteBuf
394             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
395         } else {
396             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
397         }
398         // Ensure we never overflow
399         assert numOutstandingWrites > 0;
400         return numOutstandingWrites;
401     }
402 
403     protected final IoRegistration registration() {
404         assert registration != null;
405         return registration;
406     }
407 
408     private void schedulePollOut() {
409         pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
410     }
411 
412     final void schedulePollRdHup() {
413         pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
414     }
415 
416     private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
417         assert (ioState & ioMask) == 0;
418         int fd = fd().intValue();
419         IoRegistration registration = registration();
420         IoUringIoOps ops = IoUringIoOps.newPollAdd(
421                 fd, flags((byte) 0), mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
422         long id = registration.submit(ops);
423         if (id != 0) {
424             ioState |= (byte) ioMask;
425         }
426         return id;
427     }
428 
429     final void resetCachedAddresses() {
430         local = socket.localAddress();
431         remote = socket.remoteAddress();
432     }
433 
434     abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
435         private IoUringRecvByteAllocatorHandle allocHandle;
436         private boolean closed;
437         private boolean freed;
438         private boolean socketIsEmpty;
439 
440         /**
441          * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer} and returns the number of
442          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
443          */
444         protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
445 
446         /**
447          * Schedule the write of a single message and returns the number of
448          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
449          */
450         protected abstract int scheduleWriteSingle(Object msg);
451 
452         @Override
453         public final void handle(IoRegistration registration, IoEvent ioEvent) {
454             IoUringIoEvent event = (IoUringIoEvent) ioEvent;
455             byte op = event.opcode();
456             int res = event.res();
457             int flags = event.flags();
458             short data = event.data();
459             switch (op) {
460                 case Native.IORING_OP_RECV:
461                 case Native.IORING_OP_ACCEPT:
462                 case Native.IORING_OP_RECVMSG:
463                 case Native.IORING_OP_READ:
464                     readComplete(op, res, flags, data);
465                     break;
466                 case Native.IORING_OP_WRITEV:
467                 case Native.IORING_OP_SEND:
468                 case Native.IORING_OP_SENDMSG:
469                 case Native.IORING_OP_WRITE:
470                 case Native.IORING_OP_SPLICE:
471                     writeComplete(op, res, flags, data);
472                     break;
473                 case Native.IORING_OP_POLL_ADD:
474                     pollAddComplete(res, flags, data);
475                     break;
476                 case Native.IORING_OP_ASYNC_CANCEL:
477                     cancelComplete0(op, res, flags, data);
478                     break;
479                 case Native.IORING_OP_CONNECT:
480                     connectComplete(op, res, flags, data);
481 
482                     // once the connect was completed we can also free some resources that are not needed anymore.
483                     freeMsgHdrArray();
484                     freeRemoteAddressMemory();
485                     break;
486                 case Native.IORING_OP_CLOSE:
487                     if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
488                         if (delayedClose != null) {
489                             delayedClose.promise.setSuccess();
490                         }
491                         closed = true;
492                     }
493                     break;
494                 default:
495                     break;
496             }
497 
498             // We delay the actual close if there is still a write or read scheduled, let's see if there
499             // was a close that needs to be done now.
500             handleDelayedClosed();
501 
502             if (ioState == 0 && closed) {
503                 freeResourcesNowIfNeeded(registration);
504             }
505         }
506 
507         private void freeResourcesNowIfNeeded(IoRegistration reg) {
508             if (!freed) {
509                 freed = true;
510                 freeResourcesNow(reg);
511             }
512         }
513 
514         /**
515          * Free all resources now. No new IO will be submitted for this channel via io_uring
516          *
517          * @param reg   the {@link IoRegistration} or {@code null} if it was never registered
518          */
519         protected void freeResourcesNow(IoRegistration reg) {
520             freeMsgHdrArray();
521             freeRemoteAddressMemory();
522             if (reg != null) {
523                 reg.cancel();
524             }
525         }
526 
527         private void handleDelayedClosed() {
528             if (delayedClose != null && canCloseNow()) {
529                 closeNow();
530             }
531         }
532 
533         private void pollAddComplete(int res, int flags, short data) {
534             if ((res & Native.POLLOUT) != 0) {
535                 pollOut(res);
536             }
537             if ((res & Native.POLLIN) != 0) {
538                 pollIn(res, flags, data);
539             }
540             if ((res & Native.POLLRDHUP) != 0) {
541                 pollRdHup(res);
542             }
543         }
544 
545         @Override
546         public final void close() throws Exception {
547             close(voidPromise());
548         }
549 
550         @Override
551         protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
552             if (closeFuture().isDone()) {
553                 // Closed already before.
554                 safeSetSuccess(promise);
555                 return;
556             }
557             if (delayedClose == null) {
558                 // We have a write operation pending that should be completed asap.
559                 // We will do the actual close operation one this write result is returned as otherwise
560                 // we may get into trouble as we may close the fd while we did not process the write yet.
561                 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
562             } else {
563                 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
564                 return;
565             }
566 
567             boolean cancelConnect = false;
568             try {
569                 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
570                 if (connectPromise != null) {
571                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
572                     connectPromise.tryFailure(new ClosedChannelException());
573                     AbstractIoUringChannel.this.connectPromise = null;
574                     cancelConnect = true;
575                 }
576 
577                 cancelConnectTimeoutFuture();
578             } finally {
579                 // It's important we cancel all outstanding connect, write and read operations now so
580                 // we will be able to process a delayed close if needed.
581                 cancelOps(cancelConnect);
582             }
583 
584             if (canCloseNow()) {
585                 // Currently there are is no WRITE and READ scheduled so we can start to teardown the channel.
586                 closeNow();
587             }
588         }
589 
590         private void cancelOps(boolean cancelConnect) {
591             if (registration == null || !registration.isValid()) {
592                 return;
593             }
594             byte flags = flags((byte) 0);
595             if ((ioState & POLL_RDHUP_SCHEDULED) != 0) {
596                 assert pollRdhupId != 0;
597                 long id = registration.submit(
598                         IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
599                 assert id != 0;
600             }
601             if ((ioState & POLL_IN_SCHEDULED) != 0) {
602                 assert pollInId != 0;
603                 long id = registration.submit(
604                         IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
605                 assert id != 0;
606             }
607             if ((ioState & POLL_OUT_SCHEDULED) != 0) {
608                 assert pollOutId != 0;
609                 long id = registration.submit(
610                         IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
611                 assert id != 0;
612             }
613             if (cancelConnect && connectId != 0) {
614                 // Best effort to cancel the already submitted connect request.
615                 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
616                 assert id != 0;
617             }
618             cancelOutstandingReads(registration, numOutstandingReads);
619             cancelOutstandingWrites(registration, numOutstandingWrites);
620         }
621 
622         private boolean canCloseNow() {
623             // Currently there are is no WRITE and READ scheduled, we can close the channel now without
624             // problems related to re-ordering of completions.
625             return (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
626         }
627 
628         private void closeNow() {
629             super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
630         }
631 
632         @Override
633         protected final void flush0() {
634             // Flush immediately only when there's no pending flush.
635             // If there's a pending flush operation, event loop will call forceFlush() later,
636             // and thus there's no need to call it now.
637             if ((ioState & POLL_OUT_SCHEDULED) == 0) {
638                 super.flush0();
639             }
640         }
641 
642         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
643             if (promise == null) {
644                 // Closed via cancellation and the promise has been notified already.
645                 return;
646             }
647 
648             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
649             promise.tryFailure(cause);
650             closeIfClosed();
651         }
652 
653         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
654             if (promise == null) {
655                 // Closed via cancellation and the promise has been notified already.
656                 return;
657             }
658             active = true;
659 
660             if (local == null) {
661                 local = socket.localAddress();
662             }
663             computeRemote();
664 
665             // Register POLLRDHUP
666             schedulePollRdHup();
667 
668             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
669             // We still need to ensure we call fireChannelActive() in this case.
670             boolean active = isActive();
671 
672             // trySuccess() will return false if a user cancelled the connection attempt.
673             boolean promiseSet = promise.trySuccess();
674 
675             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
676             // because what happened is what happened.
677             if (!wasActive && active) {
678                 pipeline().fireChannelActive();
679             }
680 
681             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
682             if (!promiseSet) {
683                 close(voidPromise());
684             }
685         }
686 
687         @Override
688         public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
689             if (allocHandle == null) {
690                 allocHandle = new IoUringRecvByteAllocatorHandle(
691                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
692             }
693             return allocHandle;
694         }
695 
696         final void shutdownInput(boolean allDataRead) {
697             logger.trace("shutdownInput Fd: {}", fd().intValue());
698             if (!socket.isInputShutdown()) {
699                 if (isAllowHalfClosure(config())) {
700                     try {
701                         socket.shutdown(true, false);
702                     } catch (IOException ignored) {
703                         // We attempted to shutdown and failed, which means the input has already effectively been
704                         // shutdown.
705                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
706                         return;
707                     } catch (NotYetConnectedException ignore) {
708                         // We attempted to shutdown and failed, which means the input has already effectively been
709                         // shutdown.
710                     }
711                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
712                 } else {
713                     // Handle this same way as if we did read all data so we don't schedule another read.
714                     inputClosedSeenErrorOnRead = true;
715                     close(voidPromise());
716                     return;
717                 }
718             }
719             if (allDataRead && !inputClosedSeenErrorOnRead) {
720                 inputClosedSeenErrorOnRead = true;
721                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
722             }
723         }
724 
725         private void fireEventAndClose(Object evt) {
726             pipeline().fireUserEventTriggered(evt);
727             close(voidPromise());
728         }
729 
730         final void schedulePollIn() {
731             assert (ioState & POLL_IN_SCHEDULED) == 0;
732             if (!isActive() || shouldBreakIoUringInReady(config())) {
733                 return;
734             }
735             pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, IoUring.isPollAddMultishotSupported());
736         }
737 
738         private void readComplete(byte op, int res, int flags, short data) {
739             assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
740 
741             boolean multishot = numOutstandingReads == -1;
742             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
743             if (rearm) {
744                 // Reset READ_SCHEDULED if there is nothing more to handle and so we need to re-arm. This works for
745                 // multi-shot and non multi-shot variants.
746                 ioState &= ~READ_SCHEDULED;
747             }
748             boolean pending = readPending;
749             if (multishot) {
750                 // Reset readPending so we can still keep track if we might need to cancel the multi-shot read or
751                 // not.
752                 readPending = false;
753             } else if (--numOutstandingReads == 0) {
754                 // We received all outstanding completions.
755                 readPending = false;
756                 ioState &= ~READ_SCHEDULED;
757             }
758             inReadComplete = true;
759             try {
760                 socketIsEmpty = socketIsEmpty(flags);
761                 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
762                         (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
763                 readComplete0(op, res, flags, data, numOutstandingReads);
764             } finally {
765                 try {
766                     // Check if we should consider the read loop to be done.
767                     if (recvBufAllocHandle().isReadComplete()) {
768                         // Reset the handle as we are done with the read-loop.
769                         recvBufAllocHandle().reset(config());
770 
771                         // Check if this was a readComplete(...) triggered by a read or multi-shot read.
772                         if (!multishot) {
773                             if (readPending) {
774                                 // This was a "normal" read and the user did signal we should continue reading.
775                                 // Let's schedule the read now.
776                                 doBeginReadNow();
777                             }
778                         } else {
779                             // The readComplete(...) was triggered by a multi-shot read. Because of this the state
780                             // machine is a bit more complicated.
781 
782                             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
783                                 // The readComplete(...) was triggered because the previous read was cancelled.
784                                 // In this case we we need to check if the user did signal the desire to read again
785                                 // in the meantime. If this is the case we need to schedule the read to ensure
786                                 // we do not stall.
787                                 if (pending) {
788                                     doBeginReadNow();
789                                 }
790                             } else if (rearm) {
791                                 // We need to rearm the multishot as otherwise we might miss some data.
792                                 doBeginReadNow();
793                             } else if (!readPending) {
794                                 // Cancel the multi-shot read now as the user did not signal that we want to keep
795                                 // reading while we handle the completion event.
796                                 cancelOutstandingReads(registration, numOutstandingReads);
797                             }
798                         }
799                     } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
800                         // The readComplete(...) was triggered because the previous read was cancelled.
801                         // In this case we we need to check if the user did signal the desire to read again
802                         // in the meantime. If this is the case we need to schedule the read to ensure
803                         // we do not stall.
804                         if (pending) {
805                             doBeginReadNow();
806                         }
807                     } else if (multishot && rearm) {
808                         // We need to rearm the multishot as otherwise we might miss some data.
809                         doBeginReadNow();
810                     }
811                 } finally {
812                     inReadComplete = false;
813                     socketIsEmpty = false;
814                 }
815             }
816         }
817 
818         /**
819          * Called once a read was completed.
820          */
821         protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
822 
823         /**
824          * Called once POLLRDHUP event is ready to be processed
825          */
826         private void pollRdHup(int res) {
827             ioState &= ~POLL_RDHUP_SCHEDULED;
828             pollRdhupId = 0;
829             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
830                 return;
831             }
832 
833             // Mark that we received a POLLRDHUP and so need to continue reading until all the input ist drained.
834             recvBufAllocHandle().rdHupReceived();
835 
836             if (isActive()) {
837                 scheduleFirstReadIfNeeded();
838             } else {
839                 // Just to be safe make sure the input marked as closed.
840                 shutdownInput(false);
841             }
842         }
843 
844         /**
845          * Called once POLLIN event is ready to be processed
846          */
847         private void pollIn(int res, int flags, short data) {
848             // Check if we need to rearm. This works for both cases, POLL_ADD and POLL_ADD_MULTI.
849             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
850             if (rearm) {
851                 ioState &= ~POLL_IN_SCHEDULED;
852                 pollInId = 0;
853             }
854             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
855                 return;
856             }
857             if (!readPending) {
858                 // We received the POLLIN but the user is not interested yet in reading, just mark socketHasMoreData
859                 // as true so we will trigger a read directly once the user calls read()
860                 socketHasMoreData = true;
861                 return;
862             }
863             scheduleFirstReadIfNeeded();
864         }
865 
866         private void scheduleFirstReadIfNeeded() {
867             if ((ioState & READ_SCHEDULED) == 0) {
868                 scheduleFirstRead();
869             }
870         }
871 
872         private void scheduleFirstRead() {
873             // This is a new "read loop" so we need to reset the allocHandle.
874             final ChannelConfig config = config();
875             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
876             allocHandle.reset(config);
877             scheduleRead(true);
878         }
879 
880         protected final void scheduleRead(boolean first) {
881             // Only schedule another read if the fd is still open.
882             if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
883                 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
884                 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
885                     ioState |= READ_SCHEDULED;
886                 }
887             }
888         }
889 
890         /**
891          * Schedule a read and returns the number of {@link #readComplete(byte, int, int, short)}
892          * calls that are expected because of the scheduled read.
893          *
894          * @param first             {@code true} if this is the first read of a read loop.
895          * @param socketIsEmpty     {@code true} if the socket is guaranteed to be empty, {@code false} otherwise.
896          * @return                  the number of {@link #readComplete(byte, int, int, short)} calls expected or
897          *                          {@code -1} if {@link #readComplete(byte, int, int, short)} is called until
898          *                          the read is cancelled (multi-shot).
899          */
900         protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
901 
902         /**
903          * Called once POLLOUT event is ready to be processed
904          *
905          * @param res   the result.
906          */
907         private void pollOut(int res) {
908             ioState &= ~POLL_OUT_SCHEDULED;
909             pollOutId = 0;
910             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
911                 return;
912             }
913             // pending connect
914             if (connectPromise != null) {
915                 // Note this method is invoked by the event loop only if the connection attempt was
916                 // neither cancelled nor timed out.
917 
918                 assert eventLoop().inEventLoop();
919 
920                 boolean connectStillInProgress = false;
921                 try {
922                     boolean wasActive = isActive();
923                     if (!socket.finishConnect()) {
924                         connectStillInProgress = true;
925                         return;
926                     }
927                     fulfillConnectPromise(connectPromise, wasActive);
928                 } catch (Throwable t) {
929                     fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
930                 } finally {
931                     if (!connectStillInProgress) {
932                         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0
933                         // is used
934                         // See https://github.com/netty/netty/issues/1770
935                         cancelConnectTimeoutFuture();
936                         connectPromise = null;
937                     } else {
938                         // The connect was not done yet, register for POLLOUT again
939                         schedulePollOut();
940                     }
941                 }
942             } else if (!socket.isOutputShutdown()) {
943                 // Try writing again
944                 super.flush0();
945             }
946         }
947 
948         /**
949          * Called once a write was completed.
950          *
951          * @param op    the op code.
952          * @param res   the result.
953          * @param flags the flags.
954          * @param data  the data that was passed when submitting the op.
955          */
956         private void writeComplete(byte op, int res, int flags, short data) {
957             if ((ioState & CONNECT_SCHEDULED) != 0) {
958                 // The writeComplete(...) callback was called because of a sendmsg(...) result that was used for
959                 // TCP_FASTOPEN_CONNECT.
960                 freeMsgHdrArray();
961                 if (res > 0) {
962                     // Connect complete!
963                     outboundBuffer().removeBytes(res);
964 
965                     // Explicit pass in 0 as this is returned by a connect(...) call when it was successful.
966                     connectComplete(op, 0, flags, data);
967                 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
968                     // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
969                     // In this case, our TCP connection will be established normally, but no data was transmitted at
970                     // this time. We'll just transmit the data with normal writes later.
971                     // Let's submit a normal connect.
972                     submitConnect((InetSocketAddress) requestedRemoteAddress);
973                 } else {
974                     // There was an error, handle it as a normal connect error.
975                     connectComplete(op, res, flags, data);
976                 }
977                 return;
978             }
979             assert numOutstandingWrites > 0;
980             --numOutstandingWrites;
981 
982             boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
983             if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
984 
985                 // We were not able to write everything, let's register for POLLOUT
986                 schedulePollOut();
987             }
988 
989             // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
990             // while still removing messages internally in removeBytes(...) which then may corrupt state.
991             if (numOutstandingWrites == 0) {
992                 ioState &= ~WRITE_SCHEDULED;
993 
994                 // If we could write all and we did not schedule a pollout yet let us try to write again
995                 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
996                     scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
997                 }
998             }
999         }
1000 
1001         /**
1002          * Called once a write was completed.
1003          * @param op            the op code
1004          * @param res           the result.
1005          * @param flags         the flags.
1006          * @param data          the data that was passed when submitting the op.
1007          * @param outstanding   the outstanding write completions.
1008          */
1009         abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1010 
1011         /**
1012          * Called once a cancel was completed.
1013          *
1014          * @param op            the op code
1015          * @param res           the result.
1016          * @param flags         the flags.
1017          * @param data          the data that was passed when submitting the op.
1018          */
1019         void cancelComplete0(byte op, int res, int flags, short data) {
1020             // NOOP
1021         }
1022 
1023         /**
1024          * Called once a connect was completed.
1025          * @param op            the op code.
1026          * @param res           the result.
1027          * @param flags         the flags.
1028          * @param data          the data that was passed when submitting the op.
1029          */
1030         void connectComplete(byte op, int res, int flags, short data) {
1031             ioState &= ~CONNECT_SCHEDULED;
1032             freeRemoteAddressMemory();
1033 
1034             if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1035                 // connect not complete yet need to wait for poll_out event
1036                 schedulePollOut();
1037             } else {
1038                 try {
1039                     if (res == 0) {
1040                         fulfillConnectPromise(connectPromise, active);
1041                         if (readPending) {
1042                             doBeginReadNow();
1043                         }
1044                     } else {
1045                         try {
1046                             Errors.throwConnectException("io_uring connect", res);
1047                         } catch (Throwable cause) {
1048                             fulfillConnectPromise(connectPromise, cause);
1049                         }
1050                     }
1051                 } finally {
1052                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is
1053                     // used
1054                     // See https://github.com/netty/netty/issues/1770
1055                     cancelConnectTimeoutFuture();
1056                     connectPromise = null;
1057                 }
1058             }
1059         }
1060 
1061         @Override
1062         public void connect(
1063                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1064             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
1065             // non-blocking io.
1066             if (promise.isDone() || !ensureOpen(promise)) {
1067                 return;
1068             }
1069 
1070             if (delayedClose != null) {
1071                 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1072                 return;
1073             }
1074             try {
1075                 if (connectPromise != null) {
1076                     throw new ConnectionPendingException();
1077                 }
1078                 if (localAddress instanceof InetSocketAddress) {
1079                     checkResolvable((InetSocketAddress) localAddress);
1080                 }
1081 
1082                 if (remoteAddress instanceof InetSocketAddress) {
1083                     checkResolvable((InetSocketAddress) remoteAddress);
1084                 }
1085 
1086                 if (remote != null) {
1087                     // Check if already connected before trying to connect. This is needed as connect(...) will not#
1088                     // return -1 and set errno to EISCONN if a previous connect(...) attempt was setting errno to
1089                     // EINPROGRESS and finished later.
1090                     throw new AlreadyConnectedException();
1091                 }
1092 
1093                 if (localAddress != null) {
1094                     socket.bind(localAddress);
1095                 }
1096 
1097                 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1098 
1099                 ByteBuf initialData = null;
1100                 if (IoUring.isTcpFastOpenClientSideAvailable() &&
1101                         config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1102                     ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1103                     outbound.addFlush();
1104                     Object curr;
1105                     if ((curr = outbound.current()) instanceof ByteBuf) {
1106                         initialData = (ByteBuf) curr;
1107                     }
1108                 }
1109                 if (initialData != null) {
1110                     msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1111                     MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1112                     hdr.write(socket, inetSocketAddress, initialData.memoryAddress(),
1113                             initialData.readableBytes(), (short) 0);
1114 
1115                     int fd = fd().intValue();
1116                     IoRegistration registration = registration();
1117                     IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, flags((byte) 0), Native.MSG_FASTOPEN,
1118                             hdr.address(), hdr.idx());
1119                     connectId = registration.submit(ops);
1120                     if (connectId == 0) {
1121                         // Directly release the memory if submitting failed.
1122                         freeMsgHdrArray();
1123                     }
1124                 } else {
1125                     submitConnect(inetSocketAddress);
1126                 }
1127                 if (connectId != 0) {
1128                     ioState |= CONNECT_SCHEDULED;
1129                 }
1130             } catch (Throwable t) {
1131                 closeIfClosed();
1132                 promise.tryFailure(annotateConnectException(t, remoteAddress));
1133                 return;
1134             }
1135             connectPromise = promise;
1136             requestedRemoteAddress = remoteAddress;
1137             // Schedule connect timeout.
1138             int connectTimeoutMillis = config().getConnectTimeoutMillis();
1139             if (connectTimeoutMillis > 0) {
1140                 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1141                     @Override
1142                     public void run() {
1143                         ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1144                         if (connectPromise != null && !connectPromise.isDone() &&
1145                                 connectPromise.tryFailure(new ConnectTimeoutException(
1146                                         "connection timed out: " + remoteAddress))) {
1147                             close(voidPromise());
1148                         }
1149                     }
1150                 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1151             }
1152 
1153             promise.addListener(new ChannelFutureListener() {
1154                 @Override
1155                 public void operationComplete(ChannelFuture future) {
1156                     // If the connect future is cancelled we also cancel the timeout and close the
1157                     // underlying socket.
1158                     if (future.isCancelled()) {
1159                         cancelConnectTimeoutFuture();
1160                         connectPromise = null;
1161                         close(voidPromise());
1162                     }
1163                 }
1164             });
1165         }
1166     }
1167 
1168     private void submitConnect(InetSocketAddress inetSocketAddress) {
1169         remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1170         long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
1171 
1172         SockaddrIn.write(socket.isIpv6(), remoteAddressMemoryAddress, inetSocketAddress);
1173 
1174         int fd = fd().intValue();
1175         IoRegistration registration = registration();
1176         IoUringIoOps ops = IoUringIoOps.newConnect(
1177                 fd, flags((byte) 0), remoteAddressMemoryAddress, nextOpsId());
1178         connectId = registration.submit(ops);
1179         if (connectId == 0) {
1180             // Directly release the memory if submitting failed.
1181             freeRemoteAddressMemory();
1182         }
1183     }
1184 
1185     @Override
1186     protected Object filterOutboundMessage(Object msg) {
1187         if (msg instanceof ByteBuf) {
1188             ByteBuf buf = (ByteBuf) msg;
1189             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1190         }
1191         throw new UnsupportedOperationException("unsupported message type");
1192     }
1193 
1194     @Override
1195     protected void doRegister(ChannelPromise promise) {
1196         IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1197         eventLoop.register(ioUringUnsafe()).addListener(f -> {
1198             if (f.isSuccess()) {
1199                 registration = (IoRegistration) f.getNow();
1200                 promise.setSuccess();
1201             } else {
1202                 promise.setFailure(f.cause());
1203             }
1204         });
1205     }
1206 
1207     @Override
1208     protected final void doDeregister() {
1209         // Cancel all previous submitted ops.
1210         ioUringUnsafe().cancelOps(connectPromise != null);
1211     }
1212 
1213     @Override
1214     protected void doBind(final SocketAddress local) throws Exception {
1215         if (local instanceof InetSocketAddress) {
1216             checkResolvable((InetSocketAddress) local);
1217         }
1218         socket.bind(local);
1219         this.local = socket.localAddress();
1220     }
1221 
1222     protected static void checkResolvable(InetSocketAddress addr) {
1223         if (addr.isUnresolved()) {
1224             throw new UnresolvedAddressException();
1225         }
1226     }
1227 
1228     @Override
1229     protected final SocketAddress localAddress0() {
1230         return local;
1231     }
1232 
1233     @Override
1234     protected final SocketAddress remoteAddress0() {
1235         return remote;
1236     }
1237 
1238     private static boolean isAllowHalfClosure(ChannelConfig config) {
1239         return config instanceof SocketChannelConfig &&
1240                ((SocketChannelConfig) config).isAllowHalfClosure();
1241     }
1242 
1243     private void cancelConnectTimeoutFuture() {
1244         if (connectTimeoutFuture != null) {
1245             connectTimeoutFuture.cancel(false);
1246             connectTimeoutFuture = null;
1247         }
1248     }
1249 
1250     private void computeRemote() {
1251         if (requestedRemoteAddress instanceof InetSocketAddress) {
1252             remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1253         }
1254     }
1255 
1256     private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1257         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1258     }
1259 
1260     /**
1261      * Return if the socket is guaranteed to be empty when the submitted io was executed and the completion event be
1262      * created.
1263      * @param flags     the flags that were part of the completion
1264      * @return          {@code true} if empty.
1265      */
1266     protected abstract boolean socketIsEmpty(int flags);
1267 
1268     abstract boolean isPollInFirst();
1269 }