View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufHolder;
21  import io.netty.buffer.ByteBufUtil;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractChannel;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelOption;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.ServerChannel;
38  import io.netty.channel.socket.ChannelInputShutdownEvent;
39  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40  import io.netty.channel.socket.SocketChannelConfig;
41  import io.netty.channel.unix.Buffer;
42  import io.netty.channel.unix.Errors;
43  import io.netty.channel.unix.FileDescriptor;
44  import io.netty.channel.unix.UnixChannel;
45  import io.netty.channel.unix.UnixChannelUtil;
46  import io.netty.util.ReferenceCountUtil;
47  import io.netty.util.concurrent.PromiseNotifier;
48  import io.netty.util.internal.logging.InternalLogger;
49  import io.netty.util.internal.logging.InternalLoggerFactory;
50  
51  import java.io.IOException;
52  import java.net.InetSocketAddress;
53  import java.net.SocketAddress;
54  import java.nio.ByteBuffer;
55  import java.nio.channels.AlreadyConnectedException;
56  import java.nio.channels.ClosedChannelException;
57  import java.nio.channels.ConnectionPendingException;
58  import java.nio.channels.NotYetConnectedException;
59  import java.nio.channels.UnresolvedAddressException;
60  import java.util.concurrent.ScheduledFuture;
61  import java.util.concurrent.TimeUnit;
62  
63  import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
64  import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
65  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
66  import static io.netty.util.internal.ObjectUtil.checkNotNull;
67  
68  
69  abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
70      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
71      final LinuxSocket socket;
72      protected volatile boolean active;
73  
74      // Different masks for outstanding I/O operations.
75      private static final int POLL_IN_SCHEDULED = 1;
76      private static final int POLL_OUT_SCHEDULED = 1 << 2;
77      private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
78      private static final int WRITE_SCHEDULED = 1 << 4;
79      private static final int READ_SCHEDULED = 1 << 5;
80      private static final int CONNECT_SCHEDULED = 1 << 6;
81  
82      private short opsId = Short.MIN_VALUE;
83  
84      private long pollInId;
85      private long pollOutId;
86      private long pollRdhupId;
87      private long connectId;
88  
89      // A byte is enough for now.
90      private byte ioState;
91  
92      // It's possible that multiple read / writes are issued. We need to keep track of these.
93      // Let's limit the amount of pending writes and reads by Short.MAX_VALUE. Maybe Byte.MAX_VALUE would also be good
94      // enough but let's be a bit more flexible for now.
95      private short numOutstandingWrites;
96      private short numOutstandingReads;
97  
98      private boolean readPending;
99      private boolean inReadComplete;
100 
101     private final class DelayedClose {
102         private final ChannelPromise promise;
103         private final Throwable cause;
104         private final ClosedChannelException closeCause;
105 
106         DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
107             this.promise = promise;
108             this.cause = cause;
109             this.closeCause = closeCause;
110         }
111     }
112     private DelayedClose delayedClose;
113     private boolean inputClosedSeenErrorOnRead;
114 
115     /**
116      * The future of the current connection attempt.  If not null, subsequent connection attempts will fail.
117      */
118     private ChannelPromise connectPromise;
119     private ScheduledFuture<?> connectTimeoutFuture;
120     private SocketAddress requestedRemoteAddress;
121     private ByteBuffer remoteAddressMemory;
122     private MsgHdrMemoryArray msgHdrMemoryArray;
123 
124     private IoUringIoRegistration registration;
125 
126     private volatile SocketAddress local;
127     private volatile SocketAddress remote;
128 
129     AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
130         super(parent);
131         this.socket = checkNotNull(socket, "fd");
132 
133         if (active) {
134             // Directly cache the remote and local addresses
135             // See https://github.com/netty/netty/issues/2359
136             this.active = true;
137             this.local = socket.localAddress();
138             this.remote = socket.remoteAddress();
139         }
140 
141         logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
142     }
143 
144     AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
145         super(parent);
146         this.socket = checkNotNull(fd, "fd");
147         this.active = true;
148 
149         // Directly cache the remote and local addresses
150         // See https://github.com/netty/netty/issues/2359
151         this.remote = remote;
152         this.local = fd.localAddress();
153     }
154 
155     /**
156      * Returns the next id that should be used when submitting {@link IoUringIoOps}.
157      *
158      * @return  opsId
159      */
160     protected final short nextOpsId() {
161         short id = opsId++;
162 
163         // We use 0 for "none".
164         if (id == 0) {
165             id = opsId++;
166         }
167         return id;
168     }
169 
170     public final boolean isOpen() {
171         return socket.isOpen();
172     }
173 
174     @Override
175     public boolean isActive() {
176         return active;
177     }
178 
179     @Override
180     public final FileDescriptor fd() {
181         return socket;
182     }
183 
184     private AbstractUringUnsafe ioUringUnsafe() {
185         return (AbstractUringUnsafe) unsafe();
186     }
187 
188     @Override
189     protected boolean isCompatible(final EventLoop loop) {
190         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
191     }
192 
193     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
194         return newDirectBuffer(buf, buf);
195     }
196 
197     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
198         final int readableBytes = buf.readableBytes();
199         if (readableBytes == 0) {
200             ReferenceCountUtil.release(holder);
201             return Unpooled.EMPTY_BUFFER;
202         }
203 
204         final ByteBufAllocator alloc = alloc();
205         if (alloc.isDirectBufferPooled()) {
206             return newDirectBuffer0(holder, buf, alloc, readableBytes);
207         }
208 
209         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
210         if (directBuf == null) {
211             return newDirectBuffer0(holder, buf, alloc, readableBytes);
212         }
213 
214         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
215         ReferenceCountUtil.safeRelease(holder);
216         return directBuf;
217     }
218 
219     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
220         final ByteBuf directBuf = alloc.directBuffer(capacity);
221         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
222         ReferenceCountUtil.safeRelease(holder);
223         return directBuf;
224     }
225 
226     /**
227      * Cancel all outstanding reads
228      *
229      * @param registration          the {@link IoUringIoRegistration}.
230      * @param numOutstandingReads   the number of outstanding reads.
231      */
232     protected abstract void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads);
233 
234     /**
235      * Cancel all outstanding writes
236      *
237      * @param registration          the {@link IoUringIoRegistration}.
238      * @param numOutstandingWrites  the number of outstanding writes.
239      */
240     protected abstract void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites);
241 
242     @Override
243     protected void doDisconnect() throws Exception {
244     }
245 
246     protected byte flags(byte flags) {
247         if (((IOUringChannelConfig) config()).getIoseqAsync()) {
248             return (byte) (flags | Native.IOSQE_ASYNC);
249         }
250         return flags;
251     }
252 
253     private void freeRemoteAddressMemory() {
254         if (remoteAddressMemory != null) {
255             Buffer.free(remoteAddressMemory);
256             remoteAddressMemory = null;
257         }
258     }
259 
260     private void freeMsgHdrArray() {
261         if (msgHdrMemoryArray != null) {
262             msgHdrMemoryArray.release();
263             msgHdrMemoryArray = null;
264         }
265     }
266 
267     @Override
268     protected void doClose() throws Exception {
269         active = false;
270 
271         if (registration != null) {
272             if (socket.markClosed()) {
273                 int fd = fd().intValue();
274                 IoUringIoOps ops = IoUringIoOps.newClose(fd, flags((byte) 0), nextOpsId());
275                 registration.submit(ops);
276             }
277         } else {
278             // This one was never registered just use a syscall to close.
279             socket.close();
280             ioUringUnsafe().freeResourcesNowIfNeeded(null);
281         }
282     }
283 
284     @Override
285     protected final void doBeginRead() {
286         if (inputClosedSeenErrorOnRead) {
287             // We did see an error while reading and so closed the input. Stop reading.
288             return;
289         }
290         if (readPending) {
291             // We already have a read pending.
292             return;
293         }
294         if (inReadComplete || !isActive()) {
295             // We are currently in the readComplete(...) callback which might issue more reads by itself. Just
296             // mark it as a pending read. If readComplete(...) will not issue more reads itself it will pick up
297             // the readPending flag, reset it and call doBeginReadNow().
298             readPending = true;
299             return;
300         }
301         doBeginReadNow();
302     }
303 
304     private void doBeginReadNow() {
305         if (!((IOUringChannelConfig) config()).getPollInFirst()) {
306             // If the socket is blocking we will directly call scheduleFirstReadIfNeeded() as we can use FASTPOLL.
307             ioUringUnsafe().scheduleFirstReadIfNeeded();
308         } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
309             ioUringUnsafe().schedulePollIn();
310         }
311     }
312 
313     private void submitAndRunNow() {
314         // Force a submit and processing of the completions to ensure we drain the outbound buffer and
315         // send the data to the remote peer.
316         registration().submit(IoUringIoHandler.SUBMIT_AND_RUN_ALL);
317     }
318 
319     @Override
320     protected void doWrite(ChannelOutboundBuffer in) {
321         scheduleWriteIfNeeded(in, true);
322     }
323 
324     protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
325         if ((ioState & WRITE_SCHEDULED) != 0) {
326             return;
327         }
328         if (scheduleWrite(in) > 0) {
329             ioState |= WRITE_SCHEDULED;
330             if (submitAndRunNow && !isWritable()) {
331                 submitAndRunNow();
332             }
333         }
334     }
335 
336     private int scheduleWrite(ChannelOutboundBuffer in) {
337         if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
338             return 0;
339         }
340         if (in == null) {
341             return 0;
342         }
343 
344         int msgCount = in.size();
345         if (msgCount == 0) {
346             return 0;
347         }
348         Object msg = in.current();
349 
350         if (msgCount > 1 && in.current() instanceof ByteBuf) {
351             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
352         } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
353                     (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
354             // We also need some special handling for CompositeByteBuf
355             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
356         } else {
357             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
358         }
359         // Ensure we never overflow
360         assert numOutstandingWrites > 0;
361         return numOutstandingWrites;
362     }
363 
364     protected final IoUringIoRegistration registration() {
365         assert registration != null;
366         return registration;
367     }
368 
369     private void schedulePollOut() {
370         pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT);
371     }
372 
373     final void schedulePollRdHup() {
374         pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP);
375     }
376 
377     private long schedulePollAdd(int ioMask, int mask) {
378         assert (ioState & ioMask) == 0;
379         int fd = fd().intValue();
380         IoUringIoRegistration registration = registration();
381         IoUringIoOps ops = IoUringIoOps.newPollAdd(
382                 fd, flags((byte) 0), mask, (short) mask);
383         long id = registration.submit(ops);
384         if (id != 0) {
385             ioState |= ioMask;
386         }
387         return id;
388     }
389 
390     final void resetCachedAddresses() {
391         local = socket.localAddress();
392         remote = socket.remoteAddress();
393     }
394 
395     abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
396         private IoUringRecvByteAllocatorHandle allocHandle;
397         private boolean socketIsEmpty;
398 
399         /**
400          * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer} and returns the number of
401          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
402          */
403         protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
404 
405         /**
406          * Schedule the write of a single message and returns the number of
407          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
408          */
409         protected abstract int scheduleWriteSingle(Object msg);
410 
411         private boolean closed;
412         private boolean freed;
413 
414         @Override
415         public final void handle(IoRegistration registration, IoEvent ioEvent) {
416             IoUringIoRegistration reg = (IoUringIoRegistration) registration;
417             IoUringIoEvent event = (IoUringIoEvent) ioEvent;
418             byte op = event.opcode();
419             int res = event.res();
420             int flags = event.flags();
421             short data = event.data();
422             switch (op) {
423                 case Native.IORING_OP_RECV:
424                 case Native.IORING_OP_ACCEPT:
425                 case Native.IORING_OP_RECVMSG:
426                 case Native.IORING_OP_READ:
427                     readComplete(op, res, flags, data);
428 
429                     // We delay the actual close if there is still a write or read scheduled, let's see if there
430                     // was a close that needs to be done now.
431                     handleDelayedClosed();
432                     break;
433                 case Native.IORING_OP_WRITEV:
434                 case Native.IORING_OP_SEND:
435                 case Native.IORING_OP_SENDMSG:
436                 case Native.IORING_OP_WRITE:
437                 case Native.IORING_OP_SPLICE:
438                     writeComplete(op, res, flags, data);
439 
440                     // We delay the actual close if there is still a write or read scheduled, let's see if there
441                     // was a close that needs to be done now.
442                     handleDelayedClosed();
443                     break;
444                 case Native.IORING_OP_POLL_ADD:
445                     pollAddComplete(res, flags, data);
446                     break;
447                 case Native.IORING_OP_POLL_REMOVE:
448                     // fd reuse can replace a closed channel (with pending POLL_REMOVE CQEs)
449                     // with a new one: better not making it to mess-up with its state!
450                     if (res == 0) {
451                         clearPollFlag(data);
452                     }
453                     break;
454                 case Native.IORING_OP_ASYNC_CANCEL:
455                     cancelComplete0(op, res, flags, data);
456 
457                     // We delay the actual close if there is still a write or read scheduled, let's see if there
458                     // was a close that needs to be done now.
459                     handleDelayedClosed();
460                     break;
461                 case Native.IORING_OP_CONNECT:
462                     connectComplete(op, res, flags, data);
463 
464                     // once the connect was completed we can also free some resources that are not needed anymore.
465                     freeMsgHdrArray();
466                     freeRemoteAddressMemory();
467                     break;
468                 case Native.IORING_OP_CLOSE:
469                     if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
470                         if (delayedClose != null) {
471                             delayedClose.promise.setSuccess();
472                         }
473                         closed = true;
474                     }
475                     break;
476             }
477 
478             if (ioState == 0 && closed) {
479                 freeResourcesNowIfNeeded(reg);
480             }
481         }
482 
483         private void freeResourcesNowIfNeeded(IoUringIoRegistration reg) {
484             if (!freed) {
485                 freed = true;
486                 freeResourcesNow(reg);
487             }
488         }
489 
490         /**
491          * Free all resources now. No new IO will be submitted for this channel via io_uring
492          *
493          * @param reg   the {@link IoUringIoRegistration} or {@code null} if it was never registered
494          */
495         protected void freeResourcesNow(IoUringIoRegistration reg) {
496             freeMsgHdrArray();
497             freeRemoteAddressMemory();
498             if (reg != null) {
499                 reg.cancel();
500             }
501         }
502 
503         private void handleDelayedClosed() {
504             if (delayedClose != null && canCloseNow()) {
505                 closeNow();
506             }
507         }
508 
509         private void pollAddComplete(int res, int flags, short data) {
510             if ((data & Native.POLLOUT) != 0) {
511                 pollOut(res);
512             }
513             if ((data & Native.POLLIN) != 0) {
514                 pollIn(res);
515             }
516             if ((data & Native.POLLRDHUP) != 0) {
517                 pollRdHup(res);
518             }
519         }
520 
521         @Override
522         public final void close() throws Exception {
523             close(voidPromise());
524         }
525 
526         @Override
527         protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
528             if (closeFuture().isDone()) {
529                 // Closed already before.
530                 safeSetSuccess(promise);
531                 return;
532             }
533             if (delayedClose == null) {
534                 // We have a write operation pending that should be completed asap.
535                 // We will do the actual close operation one this write result is returned as otherwise
536                 // we may get into trouble as we may close the fd while we did not process the write yet.
537                 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
538             } else {
539                 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
540                 return;
541             }
542 
543             boolean cancelConnect = false;
544             try {
545                 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
546                 if (connectPromise != null) {
547                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
548                     connectPromise.tryFailure(new ClosedChannelException());
549                     AbstractIoUringChannel.this.connectPromise = null;
550                     cancelConnect = true;
551                 }
552 
553                 cancelConnectTimeoutFuture();
554             } finally {
555                 // It's important we cancel all outstanding connect, write and read operations now so
556                 // we will be able to process a delayed close if needed.
557                 cancelOps(cancelConnect);
558             }
559 
560             if (canCloseNow()) {
561                 // Currently there are is no WRITE and READ scheduled so we can start to teardown the channel.
562                 closeNow();
563             }
564         }
565 
566         private void cancelOps(boolean cancelConnect) {
567             if (registration == null || !registration.isValid()) {
568                 return;
569             }
570             int fd = fd().intValue();
571             byte flags = flags((byte) 0);
572             if ((ioState & POLL_RDHUP_SCHEDULED) != 0) {
573                 registration.submit(IoUringIoOps.newPollRemove(
574                         fd, flags, pollRdhupId, (short) Native.POLLRDHUP));
575             }
576             if ((ioState & POLL_IN_SCHEDULED) != 0) {
577                 registration.submit(IoUringIoOps.newPollRemove(
578                         fd, flags, pollInId, (short) Native.POLLIN));
579             }
580             if ((ioState & POLL_OUT_SCHEDULED) != 0) {
581                 registration.submit(IoUringIoOps.newPollRemove(
582                         fd, flags, pollOutId, (short) Native.POLLOUT));
583             }
584             if (cancelConnect && connectId != 0) {
585                 // Best effort to cancel the already submitted connect request.
586                 registration.submit(IoUringIoOps.newAsyncCancel(
587                         fd, flags, connectId, Native.IORING_OP_CONNECT));
588             }
589             cancelOutstandingReads(registration, numOutstandingReads);
590             cancelOutstandingWrites(registration, numOutstandingWrites);
591         }
592 
593         private boolean canCloseNow() {
594             // Currently there are is no WRITE and READ scheduled, we can close the channel now without
595             // problems related to re-ordering of completions.
596             return (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
597         }
598 
599         private void closeNow() {
600             super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
601         }
602 
603         @Override
604         protected final void flush0() {
605             // Flush immediately only when there's no pending flush.
606             // If there's a pending flush operation, event loop will call forceFlush() later,
607             // and thus there's no need to call it now.
608             if ((ioState & POLL_OUT_SCHEDULED) == 0) {
609                 super.flush0();
610             }
611         }
612 
613         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
614             if (promise == null) {
615                 // Closed via cancellation and the promise has been notified already.
616                 return;
617             }
618 
619             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
620             promise.tryFailure(cause);
621             closeIfClosed();
622         }
623 
624         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
625             if (promise == null) {
626                 // Closed via cancellation and the promise has been notified already.
627                 return;
628             }
629             active = true;
630 
631             if (local == null) {
632                 local = socket.localAddress();
633             }
634             computeRemote();
635 
636             // Register POLLRDHUP
637             schedulePollRdHup();
638 
639             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
640             // We still need to ensure we call fireChannelActive() in this case.
641             boolean active = isActive();
642 
643             // trySuccess() will return false if a user cancelled the connection attempt.
644             boolean promiseSet = promise.trySuccess();
645 
646             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
647             // because what happened is what happened.
648             if (!wasActive && active) {
649                 pipeline().fireChannelActive();
650             }
651 
652             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
653             if (!promiseSet) {
654                 close(voidPromise());
655             }
656         }
657 
658         @Override
659         public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
660             if (allocHandle == null) {
661                 allocHandle = new IoUringRecvByteAllocatorHandle(
662                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
663             }
664             return allocHandle;
665         }
666 
667         final void shutdownInput(boolean allDataRead) {
668             logger.trace("shutdownInput Fd: {}", fd().intValue());
669             if (!socket.isInputShutdown()) {
670                 if (isAllowHalfClosure(config())) {
671                     try {
672                         socket.shutdown(true, false);
673                     } catch (IOException ignored) {
674                         // We attempted to shutdown and failed, which means the input has already effectively been
675                         // shutdown.
676                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
677                         return;
678                     } catch (NotYetConnectedException ignore) {
679                         // We attempted to shutdown and failed, which means the input has already effectively been
680                         // shutdown.
681                     }
682                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
683                 } else {
684                     close(voidPromise());
685                     return;
686                 }
687             }
688             if (allDataRead && !inputClosedSeenErrorOnRead) {
689                 inputClosedSeenErrorOnRead = true;
690                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
691             }
692         }
693 
694         private void fireEventAndClose(Object evt) {
695             pipeline().fireUserEventTriggered(evt);
696             close(voidPromise());
697         }
698 
699         final void schedulePollIn() {
700             assert (ioState & POLL_IN_SCHEDULED) == 0;
701             if (!isActive() || shouldBreakIoUringInReady(config())) {
702                 return;
703             }
704             pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN);
705         }
706 
707         private void readComplete(byte op, int res, int flags, short data) {
708             assert numOutstandingReads > 0;
709             if (--numOutstandingReads == 0) {
710                 readPending = false;
711                 ioState &= ~READ_SCHEDULED;
712             }
713             inReadComplete = true;
714             try {
715                 socketIsEmpty = socketIsEmpty(flags);
716 
717                 readComplete0(op, res, flags, data, numOutstandingReads);
718             } finally {
719                 socketIsEmpty = false;
720                 inReadComplete = false;
721                 // There is a pending read and readComplete0(...) also did stop issue read request.
722                 // Let's trigger the requested read now.
723                 if (readPending && recvBufAllocHandle().isReadComplete()) {
724                     doBeginReadNow();
725                 }
726             }
727         }
728 
729         /**
730          * Called once a read was completed.
731          */
732         protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
733 
734         /**
735          * Called once POLLRDHUP event is ready to be processed
736          */
737         private void pollRdHup(int res) {
738             ioState &= ~POLL_RDHUP_SCHEDULED;
739             pollRdhupId = 0;
740             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
741                 return;
742             }
743 
744             // Mark that we received a POLLRDHUP and so need to continue reading until all the input ist drained.
745             recvBufAllocHandle().rdHupReceived();
746 
747             if (isActive()) {
748                 scheduleFirstReadIfNeeded();
749             } else {
750                 // Just to be safe make sure the input marked as closed.
751                 shutdownInput(false);
752             }
753         }
754 
755         /**
756          * Called once POLLIN event is ready to be processed
757          */
758         private void pollIn(int res) {
759             ioState &= ~POLL_IN_SCHEDULED;
760             pollInId = 0;
761             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
762                 return;
763             }
764 
765             scheduleFirstReadIfNeeded();
766         }
767 
768         private void scheduleFirstReadIfNeeded() {
769             if ((ioState & READ_SCHEDULED) == 0) {
770                 scheduleFirstRead();
771             }
772         }
773 
774         private void scheduleFirstRead() {
775             // This is a new "read loop" so we need to reset the allocHandle.
776             final ChannelConfig config = config();
777             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
778             allocHandle.reset(config);
779             scheduleRead(true);
780         }
781 
782         protected final void scheduleRead(boolean first) {
783             // Only schedule another read if the fd is still open.
784             if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
785                 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
786                 if (numOutstandingReads > 0) {
787                     ioState |= READ_SCHEDULED;
788                 }
789             }
790         }
791 
792         /**
793          * Schedule a read and returns the number of {@link #readComplete(byte, int, int, short)}
794          * calls that are expected because of the scheduled read.
795          *
796          * @param first             {@code true} if this is the first read of a read loop.
797          * @param socketIsEmpty     {@code true} if the socket is guaranteed to be empty, {@code false} otherwise.
798          */
799         protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
800 
801         /**
802          * Called once POLLOUT event is ready to be processed
803          *
804          * @param res   the result.
805          */
806         private void pollOut(int res) {
807             ioState &= ~POLL_OUT_SCHEDULED;
808             pollOutId = 0;
809             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
810                 return;
811             }
812             // pending connect
813             if (connectPromise != null) {
814                 // Note this method is invoked by the event loop only if the connection attempt was
815                 // neither cancelled nor timed out.
816 
817                 assert eventLoop().inEventLoop();
818 
819                 boolean connectStillInProgress = false;
820                 try {
821                     boolean wasActive = isActive();
822                     if (!socket.finishConnect()) {
823                         connectStillInProgress = true;
824                         return;
825                     }
826                     fulfillConnectPromise(connectPromise, wasActive);
827                 } catch (Throwable t) {
828                     fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
829                 } finally {
830                     if (!connectStillInProgress) {
831                         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0
832                         // is used
833                         // See https://github.com/netty/netty/issues/1770
834                         cancelConnectTimeoutFuture();
835                         connectPromise = null;
836                     } else {
837                         // The connect was not done yet, register for POLLOUT again
838                         schedulePollOut();
839                     }
840                 }
841             } else if (!socket.isOutputShutdown()) {
842                 // Try writing again
843                 super.flush0();
844             }
845         }
846 
847         /**
848          * Called once a write was completed.
849          *
850          * @param op    the op code.
851          * @param res   the result.
852          * @param flags the flags.
853          * @param data  the data that was passed when submitting the op.
854          */
855         private void writeComplete(byte op, int res, int flags, short data) {
856             if ((ioState & CONNECT_SCHEDULED) != 0) {
857                 // The writeComplete(...) callback was called because of a sendmsg(...) result that was used for
858                 // TCP_FASTOPEN_CONNECT.
859                 freeMsgHdrArray();
860                 if (res > 0) {
861                     // Connect complete!
862                     outboundBuffer().removeBytes(res);
863 
864                     // Explicit pass in 0 as this is returned by a connect(...) call when it was successful.
865                     connectComplete(op, 0, flags, data);
866                 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
867                     // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
868                     // In this case, our TCP connection will be established normally, but no data was transmitted at
869                     // this time. We'll just transmit the data with normal writes later.
870                     // Let's submit a normal connect.
871                     submitConnect((InetSocketAddress) requestedRemoteAddress);
872                 } else {
873                     // There was an error, handle it as a normal connect error.
874                     connectComplete(op, res, flags, data);
875                 }
876                 return;
877             }
878             assert numOutstandingWrites > 0;
879             --numOutstandingWrites;
880 
881             boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
882             if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
883 
884                 // We were not able to write everything, let's register for POLLOUT
885                 schedulePollOut();
886             }
887 
888             // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
889             // while still removing messages internally in removeBytes(...) which then may corrupt state.
890             if (numOutstandingWrites == 0) {
891                 ioState &= ~WRITE_SCHEDULED;
892 
893                 // If we could write all and we did not schedule a pollout yet let us try to write again
894                 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
895                     scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
896                 }
897             }
898         }
899 
900         /**
901          * Called once a write was completed.
902          * @param op            the op code
903          * @param res           the result.
904          * @param flags         the flags.
905          * @param data          the data that was passed when submitting the op.
906          * @param outstanding   the outstanding write completions.
907          */
908         abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
909 
910         /**
911          * Called once a cancel was completed.
912          *
913          * @param op            the op code
914          * @param res           the result.
915          * @param flags         the flags.
916          * @param data          the data that was passed when submitting the op.
917          */
918         void cancelComplete0(byte op, int res, int flags, short data) {
919             // NOOP
920         }
921 
922         /**
923          * Called once a connect was completed.
924          * @param op            the op code.
925          * @param res           the result.
926          * @param flags         the flags.
927          * @param data          the data that was passed when submitting the op.
928          */
929         void connectComplete(byte op, int res, int flags, short data) {
930             ioState &= ~CONNECT_SCHEDULED;
931             freeRemoteAddressMemory();
932 
933             if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
934                 // connect not complete yet need to wait for poll_out event
935                 schedulePollOut();
936             } else {
937                 try {
938                     if (res == 0) {
939                         fulfillConnectPromise(connectPromise, active);
940                         if (readPending) {
941                             doBeginReadNow();
942                         }
943                     } else {
944                         try {
945                             Errors.throwConnectException("io_uring connect", res);
946                         } catch (Throwable cause) {
947                             fulfillConnectPromise(connectPromise, cause);
948                         }
949                     }
950                 } finally {
951                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is
952                     // used
953                     // See https://github.com/netty/netty/issues/1770
954                     cancelConnectTimeoutFuture();
955                     connectPromise = null;
956                 }
957             }
958         }
959 
960         @Override
961         public void connect(
962                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
963             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
964             // non-blocking io.
965             if (promise.isDone() || !ensureOpen(promise)) {
966                 return;
967             }
968 
969             if (delayedClose != null) {
970                 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
971                 return;
972             }
973             try {
974                 if (connectPromise != null) {
975                     throw new ConnectionPendingException();
976                 }
977                 if (localAddress instanceof InetSocketAddress) {
978                     checkResolvable((InetSocketAddress) localAddress);
979                 }
980 
981                 if (remoteAddress instanceof InetSocketAddress) {
982                     checkResolvable((InetSocketAddress) remoteAddress);
983                 }
984 
985                 if (remote != null) {
986                     // Check if already connected before trying to connect. This is needed as connect(...) will not#
987                     // return -1 and set errno to EISCONN if a previous connect(...) attempt was setting errno to
988                     // EINPROGRESS and finished later.
989                     throw new AlreadyConnectedException();
990                 }
991 
992                 if (localAddress != null) {
993                     socket.bind(localAddress);
994                 }
995 
996                 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
997 
998                 ByteBuf initialData = null;
999                 if (IoUring.isTcpFastOpenClientSideAvailable() &&
1000                         config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1001                     ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1002                     outbound.addFlush();
1003                     Object curr;
1004                     if ((curr = outbound.current()) instanceof ByteBuf) {
1005                         initialData = (ByteBuf) curr;
1006                     }
1007                 }
1008                 if (initialData != null) {
1009                     msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1010                     MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1011                     hdr.write(socket, inetSocketAddress, initialData.memoryAddress(),
1012                             initialData.readableBytes(), (short) 0);
1013 
1014                     int fd = fd().intValue();
1015                     IoUringIoRegistration registration = registration();
1016                     IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, flags((byte) 0), Native.MSG_FASTOPEN,
1017                             hdr.address(), hdr.idx());
1018                     connectId = registration.submit(ops);
1019                     if (connectId == 0) {
1020                         // Directly release the memory if submitting failed.
1021                         freeMsgHdrArray();
1022                     }
1023                 } else {
1024                     submitConnect(inetSocketAddress);
1025                 }
1026                 if (connectId != 0) {
1027                     ioState |= CONNECT_SCHEDULED;
1028                 }
1029             } catch (Throwable t) {
1030                 closeIfClosed();
1031                 promise.tryFailure(annotateConnectException(t, remoteAddress));
1032                 return;
1033             }
1034             connectPromise = promise;
1035             requestedRemoteAddress = remoteAddress;
1036             // Schedule connect timeout.
1037             int connectTimeoutMillis = config().getConnectTimeoutMillis();
1038             if (connectTimeoutMillis > 0) {
1039                 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1040                     @Override
1041                     public void run() {
1042                         ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1043                         if (connectPromise != null && !connectPromise.isDone() &&
1044                                 connectPromise.tryFailure(new ConnectTimeoutException(
1045                                         "connection timed out: " + remoteAddress))) {
1046                             close(voidPromise());
1047                         }
1048                     }
1049                 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1050             }
1051 
1052             promise.addListener(new ChannelFutureListener() {
1053                 @Override
1054                 public void operationComplete(ChannelFuture future) {
1055                     // If the connect future is cancelled we also cancel the timeout and close the
1056                     // underlying socket.
1057                     if (future.isCancelled()) {
1058                         cancelConnectTimeoutFuture();
1059                         connectPromise = null;
1060                         close(voidPromise());
1061                     }
1062                 }
1063             });
1064         }
1065     }
1066 
1067     private void submitConnect(InetSocketAddress inetSocketAddress) {
1068         remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1069         long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
1070 
1071         SockaddrIn.write(socket.isIpv6(), remoteAddressMemoryAddress, inetSocketAddress);
1072 
1073         int fd = fd().intValue();
1074         IoUringIoRegistration registration = registration();
1075         IoUringIoOps ops = IoUringIoOps.newConnect(
1076                 fd, flags((byte) 0), remoteAddressMemoryAddress, nextOpsId());
1077         connectId = registration.submit(ops);
1078         if (connectId == 0) {
1079             // Directly release the memory if submitting failed.
1080             freeRemoteAddressMemory();
1081         }
1082     }
1083 
1084     @Override
1085     protected Object filterOutboundMessage(Object msg) {
1086         if (msg instanceof ByteBuf) {
1087             ByteBuf buf = (ByteBuf) msg;
1088             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1089         }
1090         throw new UnsupportedOperationException("unsupported message type");
1091     }
1092 
1093     @Override
1094     protected void doRegister(ChannelPromise promise) {
1095         IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1096         eventLoop.register(ioUringUnsafe()).addListener(f -> {
1097             if (f.isSuccess()) {
1098                 registration = (IoUringIoRegistration) f.getNow();
1099                 promise.setSuccess();
1100             } else {
1101                 promise.setFailure(f.cause());
1102             }
1103         });
1104     }
1105 
1106     @Override
1107     protected final void doDeregister() {
1108         // Cancel all previous submitted ops.
1109         ioUringUnsafe().cancelOps(connectPromise != null);
1110     }
1111 
1112     @Override
1113     protected void doBind(final SocketAddress local) throws Exception {
1114         if (local instanceof InetSocketAddress) {
1115             checkResolvable((InetSocketAddress) local);
1116         }
1117         socket.bind(local);
1118         this.local = socket.localAddress();
1119     }
1120 
1121     protected static void checkResolvable(InetSocketAddress addr) {
1122         if (addr.isUnresolved()) {
1123             throw new UnresolvedAddressException();
1124         }
1125     }
1126 
1127     @Override
1128     protected final SocketAddress localAddress0() {
1129         return local;
1130     }
1131 
1132     @Override
1133     protected final SocketAddress remoteAddress0() {
1134         return remote;
1135     }
1136 
1137     private static boolean isAllowHalfClosure(ChannelConfig config) {
1138         return config instanceof SocketChannelConfig &&
1139                ((SocketChannelConfig) config).isAllowHalfClosure();
1140     }
1141 
1142     private void cancelConnectTimeoutFuture() {
1143         if (connectTimeoutFuture != null) {
1144             connectTimeoutFuture.cancel(false);
1145             connectTimeoutFuture = null;
1146         }
1147     }
1148 
1149     private void computeRemote() {
1150         if (requestedRemoteAddress instanceof InetSocketAddress) {
1151             remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1152         }
1153     }
1154 
1155     private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1156         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1157     }
1158 
1159     private void clearPollFlag(int pollMask) {
1160         if ((pollMask & Native.POLLIN) != 0) {
1161             ioState &= ~POLL_IN_SCHEDULED;
1162             pollInId = 0;
1163         }
1164         if ((pollMask & Native.POLLOUT) != 0) {
1165             ioState &= ~POLL_OUT_SCHEDULED;
1166             pollOutId = 0;
1167         }
1168         if ((pollMask & Native.POLLRDHUP) != 0) {
1169             ioState &= ~POLL_RDHUP_SCHEDULED;
1170             pollRdhupId = 0;
1171         }
1172     }
1173 
1174     /**
1175      * Return if the socket is guaranteed to be empty when the submitted io was executed and the completion event be
1176      * created.
1177      * @param flags     the flags that were part of the completion
1178      * @return          {@code true} if empty.
1179      */
1180     protected abstract boolean socketIsEmpty(int flags);
1181 }