View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufHolder;
21  import io.netty.buffer.ByteBufUtil;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractChannel;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelOption;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.ServerChannel;
38  import io.netty.channel.socket.ChannelInputShutdownEvent;
39  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40  import io.netty.channel.socket.SocketChannelConfig;
41  import io.netty.channel.unix.Buffer;
42  import io.netty.channel.unix.DomainSocketAddress;
43  import io.netty.channel.unix.Errors;
44  import io.netty.channel.unix.FileDescriptor;
45  import io.netty.channel.unix.UnixChannel;
46  import io.netty.channel.unix.UnixChannelUtil;
47  import io.netty.util.ReferenceCountUtil;
48  import io.netty.util.concurrent.PromiseNotifier;
49  import io.netty.util.internal.CleanableDirectBuffer;
50  import io.netty.util.internal.StringUtil;
51  import io.netty.util.internal.logging.InternalLogger;
52  import io.netty.util.internal.logging.InternalLoggerFactory;
53  
54  import java.io.IOException;
55  import java.net.InetSocketAddress;
56  import java.net.SocketAddress;
57  import java.nio.ByteBuffer;
58  import java.nio.channels.AlreadyConnectedException;
59  import java.nio.channels.ClosedChannelException;
60  import java.nio.channels.ConnectionPendingException;
61  import java.nio.channels.NotYetConnectedException;
62  import java.nio.channels.UnresolvedAddressException;
63  import java.util.concurrent.ScheduledFuture;
64  import java.util.concurrent.TimeUnit;
65  
66  import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
67  import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
68  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
69  import static io.netty.util.internal.ObjectUtil.checkNotNull;
70  import static io.netty.util.internal.StringUtil.className;
71  
72  
73  abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
74      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
75      final LinuxSocket socket;
76      protected volatile boolean active;
77  
78      // Different masks for outstanding I/O operations.
79      private static final int POLL_IN_SCHEDULED = 1;
80      private static final int POLL_OUT_SCHEDULED = 1 << 2;
81      private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
82      private static final int WRITE_SCHEDULED = 1 << 4;
83      private static final int READ_SCHEDULED = 1 << 5;
84      private static final int CONNECT_SCHEDULED = 1 << 6;
85  
86      private short opsId = Short.MIN_VALUE;
87  
88      private long pollInId;
89      private long pollOutId;
90      private long pollRdhupId;
91      private long connectId;
92  
93      // A byte is enough for now.
94      private byte ioState;
95  
96      // It's possible that multiple read / writes are issued. We need to keep track of these.
97      // Let's limit the amount of pending writes and reads by Short.MAX_VALUE. Maybe Byte.MAX_VALUE would also be good
98      // enough but let's be a bit more flexible for now.
99      private short numOutstandingWrites;
100     // A value of -1 means that multi-shot is used and so reads will be issued as long as the request is not canceled.
101     private short numOutstandingReads;
102 
103     private boolean readPending;
104     private boolean inReadComplete;
105     private boolean socketHasMoreData;
106 
107     private static final class DelayedClose {
108         private final ChannelPromise promise;
109         private final Throwable cause;
110         private final ClosedChannelException closeCause;
111 
112         DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
113             this.promise = promise;
114             this.cause = cause;
115             this.closeCause = closeCause;
116         }
117     }
118     private DelayedClose delayedClose;
119     private boolean inputClosedSeenErrorOnRead;
120 
121     /**
122      * The future of the current connection attempt.  If not null, subsequent connection attempts will fail.
123      */
124     private ChannelPromise connectPromise;
125     private ScheduledFuture<?> connectTimeoutFuture;
126     private SocketAddress requestedRemoteAddress;
127     private CleanableDirectBuffer cleanable;
128     private ByteBuffer remoteAddressMemory;
129     private MsgHdrMemoryArray msgHdrMemoryArray;
130 
131     private IoRegistration registration;
132 
133     private volatile SocketAddress local;
134     private volatile SocketAddress remote;
135 
136     AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
137         super(parent);
138         this.socket = checkNotNull(socket, "fd");
139 
140         if (active) {
141             // Directly cache the remote and local addresses
142             // See https://github.com/netty/netty/issues/2359
143             this.active = true;
144             this.local = socket.localAddress();
145             this.remote = socket.remoteAddress();
146         }
147 
148         logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
149     }
150 
151     AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
152         super(parent);
153         this.socket = checkNotNull(fd, "fd");
154         this.active = true;
155 
156         // Directly cache the remote and local addresses
157         // See https://github.com/netty/netty/issues/2359
158         this.remote = remote;
159         this.local = fd.localAddress();
160     }
161 
162     // Called once a Channel changed from AUTO_READ=true to AUTO_READ=false
163     final void autoReadCleared() {
164         if (!isRegistered()) {
165             return;
166         }
167         IoRegistration registration = this.registration;
168         if (registration == null || !registration.isValid()) {
169             return;
170         }
171         if (eventLoop().inEventLoop()) {
172             clearRead();
173         } else {
174             eventLoop().execute(this::clearRead);
175         }
176     }
177 
178     private void clearRead() {
179         assert eventLoop().inEventLoop();
180         readPending = false;
181         IoRegistration registration = this.registration;
182         if (registration == null || !registration.isValid()) {
183             return;
184         }
185         // Also cancel all outstanding reads as the user did signal there is no more desire to read.
186         cancelOutstandingReads(registration(), numOutstandingReads);
187     }
188 
189     /**
190      * Returns the next id that should be used when submitting {@link IoUringIoOps}.
191      *
192      * @return  opsId
193      */
194     protected final short nextOpsId() {
195         short id = opsId++;
196 
197         // We use 0 for "none".
198         if (id == 0) {
199             id = opsId++;
200         }
201         return id;
202     }
203 
204     public final boolean isOpen() {
205         return socket.isOpen();
206     }
207 
208     @Override
209     public boolean isActive() {
210         return active;
211     }
212 
213     @Override
214     public final FileDescriptor fd() {
215         return socket;
216     }
217 
218     private AbstractUringUnsafe ioUringUnsafe() {
219         return (AbstractUringUnsafe) unsafe();
220     }
221 
222     @Override
223     protected boolean isCompatible(final EventLoop loop) {
224         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
225     }
226 
227     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
228         return newDirectBuffer(buf, buf);
229     }
230 
231     protected boolean allowMultiShotPollIn() {
232         return IoUring.isPollAddMultishotEnabled();
233     }
234 
235     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
236         final int readableBytes = buf.readableBytes();
237         if (readableBytes == 0) {
238             ReferenceCountUtil.release(holder);
239             return Unpooled.EMPTY_BUFFER;
240         }
241 
242         final ByteBufAllocator alloc = alloc();
243         if (alloc.isDirectBufferPooled()) {
244             return newDirectBuffer0(holder, buf, alloc, readableBytes);
245         }
246 
247         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
248         if (directBuf == null) {
249             return newDirectBuffer0(holder, buf, alloc, readableBytes);
250         }
251 
252         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
253         ReferenceCountUtil.safeRelease(holder);
254         return directBuf;
255     }
256 
257     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
258         final ByteBuf directBuf = alloc.directBuffer(capacity);
259         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
260         ReferenceCountUtil.safeRelease(holder);
261         return directBuf;
262     }
263 
264     /**
265      * Cancel all outstanding reads
266      *
267      * @param registration          the {@link IoRegistration}.
268      * @param numOutstandingReads   the number of outstanding reads, or {@code -1} if multi-shot was used.
269      */
270     protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
271 
272     /**
273      * Cancel all outstanding writes
274      *
275      * @param registration          the {@link IoRegistration}.
276      * @param numOutstandingWrites  the number of outstanding writes.
277      */
278     protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
279 
280     @Override
281     protected void doDisconnect() throws Exception {
282     }
283 
284     private void freeRemoteAddressMemory() {
285         if (remoteAddressMemory != null) {
286             cleanable.clean();
287             cleanable = null;
288             remoteAddressMemory = null;
289         }
290     }
291 
292     private void freeMsgHdrArray() {
293         if (msgHdrMemoryArray != null) {
294             msgHdrMemoryArray.release();
295             msgHdrMemoryArray = null;
296         }
297     }
298 
299     @Override
300     protected void doClose() throws Exception {
301         active = false;
302 
303         if (registration != null) {
304             if (socket.markClosed()) {
305                 int fd = fd().intValue();
306                 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
307                 registration.submit(ops);
308             }
309         } else {
310             // This one was never registered just use a syscall to close.
311             socket.close();
312             ioUringUnsafe().unregistered();
313         }
314     }
315 
316     @Override
317     protected final void doBeginRead() {
318         if (inputClosedSeenErrorOnRead) {
319             // We did see an error while reading and so closed the input. Stop reading.
320             return;
321         }
322         if (readPending) {
323             // We already have a read pending.
324             return;
325         }
326         readPending = true;
327         if (inReadComplete || !isActive()) {
328             // We are currently in the readComplete(...) callback which might issue more reads by itself.
329             // If readComplete(...) will not issue more reads itself it will pick up the readPending flag, reset it and
330             // call doBeginReadNow().
331             return;
332         }
333         doBeginReadNow();
334     }
335 
336     private void doBeginReadNow() {
337         if (inputClosedSeenErrorOnRead) {
338             // We did see an error while reading and so closed the input.
339             return;
340         }
341         if (!isPollInFirst() ||
342                 // If the socket was not empty, and we stopped reading we need to ensure we just force the
343                 // read as POLLIN might be edge-triggered (in case of POLL_ADD_MULTI).
344                 socketHasMoreData) {
345             // If the socket is blocking we will directly call scheduleFirstReadIfNeeded() as we can use FASTPOLL.
346             ioUringUnsafe().scheduleFirstReadIfNeeded();
347         } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
348             ioUringUnsafe().schedulePollIn();
349         }
350     }
351 
352     @Override
353     protected void doWrite(ChannelOutboundBuffer in) {
354         scheduleWriteIfNeeded(in, true);
355     }
356 
357     protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
358         if ((ioState & WRITE_SCHEDULED) != 0) {
359             return;
360         }
361         if (scheduleWrite(in) > 0) {
362             ioState |= WRITE_SCHEDULED;
363             if (submitAndRunNow && !isWritable()) {
364                 submitAndRunNow();
365             }
366         }
367     }
368 
369     protected void submitAndRunNow() {
370         // NOOP
371     }
372 
373     private int scheduleWrite(ChannelOutboundBuffer in) {
374         if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
375             return 0;
376         }
377         if (in == null) {
378             return 0;
379         }
380 
381         int msgCount = in.size();
382         if (msgCount == 0) {
383             return 0;
384         }
385         Object msg = in.current();
386 
387         if (msgCount > 1 && in.current() instanceof ByteBuf) {
388             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
389         } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
390                     (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
391             // We also need some special handling for CompositeByteBuf
392             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
393         } else {
394             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
395         }
396         // Ensure we never overflow
397         assert numOutstandingWrites > 0;
398         return numOutstandingWrites;
399     }
400 
401     protected final IoRegistration registration() {
402         assert registration != null;
403         return registration;
404     }
405 
406     private void schedulePollOut() {
407         pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
408     }
409 
410     final void schedulePollRdHup() {
411         pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
412     }
413 
414     protected abstract boolean isStreamSocket();
415 
416     private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
417         assert (ioState & ioMask) == 0;
418         int fd = fd().intValue();
419         IoRegistration registration = registration();
420         IoUringIoOps ops = IoUringIoOps.newPollAdd(
421                 fd, (byte) 0, mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
422         long id = registration.submit(ops);
423         if (id != 0) {
424             ioState |= (byte) ioMask;
425         }
426         return id;
427     }
428 
429     final void resetCachedAddresses() {
430         local = socket.localAddress();
431         remote = socket.remoteAddress();
432     }
433 
434     protected abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
435         private IoUringRecvByteAllocatorHandle allocHandle;
436         private boolean closed;
437         private boolean socketIsEmpty;
438         private ChannelPromise deregisterPromise;
439 
440         /**
441          * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer} and returns the number of
442          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
443          */
444         protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
445 
446         /**
447          * Schedule the write of a single message and returns the number of
448          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
449          */
450         protected abstract int scheduleWriteSingle(Object msg);
451 
452         @Override
453         public final void handle(IoRegistration registration, IoEvent ioEvent) {
454             IoUringIoEvent event = (IoUringIoEvent) ioEvent;
455             byte op = event.opcode();
456             int res = event.res();
457             int flags = event.flags();
458             short data = event.data();
459             switch (op) {
460                 case Native.IORING_OP_RECV:
461                 case Native.IORING_OP_ACCEPT:
462                 case Native.IORING_OP_RECVMSG:
463                 case Native.IORING_OP_READ:
464                     readComplete(op, res, flags, data);
465                     break;
466                 case Native.IORING_OP_WRITEV:
467                 case Native.IORING_OP_SEND:
468                 case Native.IORING_OP_SENDMSG:
469                 case Native.IORING_OP_WRITE:
470                 case Native.IORING_OP_SPLICE:
471                 case Native.IORING_OP_SEND_ZC:
472                 case Native.IORING_OP_SENDMSG_ZC:
473                     writeComplete(op, res, flags, data);
474                     break;
475                 case Native.IORING_OP_POLL_ADD:
476                     pollAddComplete(res, flags, data);
477                     break;
478                 case Native.IORING_OP_ASYNC_CANCEL:
479                     cancelComplete0(op, res, flags, data);
480                     break;
481                 case Native.IORING_OP_CONNECT:
482                     connectComplete(op, res, flags, data);
483 
484                     // once the connect was completed we can also free some resources that are not needed anymore.
485                     freeMsgHdrArray();
486                     freeRemoteAddressMemory();
487                     break;
488                 case Native.IORING_OP_CLOSE:
489                     if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
490                         if (delayedClose != null) {
491                             delayedClose.promise.setSuccess();
492                         }
493                         closed = true;
494                     }
495                     break;
496                 default:
497                     break;
498             }
499 
500             // We delay the actual close if there is still a write or read scheduled, let's see if there
501             // was a close that needs to be done now.
502             handleDelayedClosed();
503 
504             if (ioState == 0 && (closed || !isRegistered())) {
505                 // Cancel the registration now.
506                 registration.cancel();
507             }
508         }
509 
510         @Override
511         public void unregistered() {
512             freeMsgHdrArray();
513             freeRemoteAddressMemory();
514 
515             // Check if we need to notify about the deregistration.
516             if (deregisterPromise != null) {
517                 ChannelPromise promise = deregisterPromise;
518                 deregisterPromise = null;
519                 promise.setSuccess();
520             }
521         }
522 
523         private void handleDelayedClosed() {
524             if (delayedClose != null && canCloseNow()) {
525                 closeNow();
526             }
527         }
528 
529         private void pollAddComplete(int res, int flags, short data) {
530             if ((res & Native.POLLOUT) != 0) {
531                 pollOut(res);
532             }
533             if ((res & Native.POLLIN) != 0) {
534                 pollIn(res, flags, data);
535             }
536             if ((res & Native.POLLRDHUP) != 0) {
537                 pollRdHup(res);
538             }
539         }
540 
541         @Override
542         public final void close() throws Exception {
543             close(voidPromise());
544         }
545 
546         @Override
547         protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
548             if (closeFuture().isDone()) {
549                 // Closed already before.
550                 safeSetSuccess(promise);
551                 return;
552             }
553             if (delayedClose == null) {
554                 // We have a write operation pending that should be completed asap.
555                 // We will do the actual close operation one this write result is returned as otherwise
556                 // we may get into trouble as we may close the fd while we did not process the write yet.
557                 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
558             } else {
559                 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
560                 return;
561             }
562 
563             boolean cancelConnect = false;
564             try {
565                 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
566                 if (connectPromise != null) {
567                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
568                     connectPromise.tryFailure(new ClosedChannelException());
569                     AbstractIoUringChannel.this.connectPromise = null;
570                     cancelConnect = true;
571                 }
572 
573                 cancelConnectTimeoutFuture();
574             } finally {
575                 // It's important we cancel all outstanding connect, write and read operations now so
576                 // we will be able to process a delayed close if needed.
577                 cancelOps(cancelConnect);
578             }
579 
580             if (canCloseNow()) {
581                 // Currently there are is no WRITE and READ scheduled so we can start to teardown the channel.
582                 closeNow();
583             }
584         }
585 
586         private boolean cancelOps(boolean cancelConnect) {
587             if (registration == null || !registration.isValid()) {
588                 return false;
589             }
590             boolean cancelled = false;
591             byte flags = (byte) 0;
592             if ((ioState & POLL_RDHUP_SCHEDULED) != 0 && pollRdhupId != 0) {
593                 long id = registration.submit(
594                         IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
595                 assert id != 0;
596                 pollRdhupId = 0;
597                 cancelled = true;
598             }
599             if ((ioState & POLL_IN_SCHEDULED) != 0 && pollInId != 0) {
600                 long id = registration.submit(
601                         IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
602                 assert id != 0;
603                 pollInId = 0;
604                 cancelled = true;
605             }
606             if ((ioState & POLL_OUT_SCHEDULED) != 0 && pollOutId != 0) {
607                 long id = registration.submit(
608                         IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
609                 assert id != 0;
610                 pollOutId = 0;
611                 cancelled = true;
612             }
613             if (cancelConnect && connectId != 0) {
614                 // Best effort to cancel the already submitted connect request.
615                 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
616                 assert id != 0;
617                 connectId = 0;
618                 cancelled = true;
619             }
620             if (numOutstandingReads != 0 || numOutstandingWrites != 0) {
621                 cancelled = true;
622             }
623             cancelOutstandingReads(registration, numOutstandingReads);
624             cancelOutstandingWrites(registration, numOutstandingWrites);
625             return cancelled;
626         }
627 
628         private boolean canCloseNow() {
629             // Currently there are is no WRITE and READ scheduled, we can close the channel now without
630             // problems related to re-ordering of completions.
631             return canCloseNow0() && (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
632         }
633 
634         protected boolean canCloseNow0() {
635             return true;
636         }
637 
638         private void closeNow() {
639             super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
640         }
641 
642         @Override
643         protected final void flush0() {
644             // Flush immediately only when there's no pending flush.
645             // If there's a pending flush operation, event loop will call forceFlush() later,
646             // and thus there's no need to call it now.
647             if ((ioState & POLL_OUT_SCHEDULED) == 0) {
648                 super.flush0();
649             }
650         }
651 
652         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
653             if (promise == null) {
654                 // Closed via cancellation and the promise has been notified already.
655                 return;
656             }
657 
658             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
659             promise.tryFailure(cause);
660             closeIfClosed();
661         }
662 
663         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
664             if (promise == null) {
665                 // Closed via cancellation and the promise has been notified already.
666                 return;
667             }
668             active = true;
669 
670             if (local == null) {
671                 local = socket.localAddress();
672             }
673             computeRemote();
674 
675             if (isStreamSocket()) {
676                 // Register POLLRDHUP
677                 schedulePollRdHup();
678             }
679 
680             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
681             // We still need to ensure we call fireChannelActive() in this case.
682             boolean active = isActive();
683 
684             // trySuccess() will return false if a user cancelled the connection attempt.
685             boolean promiseSet = promise.trySuccess();
686 
687             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
688             // because what happened is what happened.
689             if (!wasActive && active) {
690                 pipeline().fireChannelActive();
691             }
692 
693             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
694             if (!promiseSet) {
695                 close(voidPromise());
696             }
697         }
698 
699         @Override
700         public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
701             if (allocHandle == null) {
702                 allocHandle = new IoUringRecvByteAllocatorHandle(
703                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
704             }
705             return allocHandle;
706         }
707 
708         final void shutdownInput(boolean allDataRead) {
709             logger.trace("shutdownInput Fd: {}", fd().intValue());
710             if (!socket.isInputShutdown()) {
711                 if (isAllowHalfClosure(config())) {
712                     try {
713                         socket.shutdown(true, false);
714                     } catch (IOException ignored) {
715                         // We attempted to shutdown and failed, which means the input has already effectively been
716                         // shutdown.
717                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
718                         return;
719                     } catch (NotYetConnectedException ignore) {
720                         // We attempted to shutdown and failed, which means the input has already effectively been
721                         // shutdown.
722                     }
723                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
724                 } else {
725                     // Handle this same way as if we did read all data so we don't schedule another read.
726                     inputClosedSeenErrorOnRead = true;
727                     close(voidPromise());
728                     return;
729                 }
730             }
731             if (allDataRead && !inputClosedSeenErrorOnRead) {
732                 inputClosedSeenErrorOnRead = true;
733                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
734             }
735         }
736 
737         private void fireEventAndClose(Object evt) {
738             pipeline().fireUserEventTriggered(evt);
739             close(voidPromise());
740         }
741 
742         final void schedulePollIn() {
743             assert (ioState & POLL_IN_SCHEDULED) == 0;
744             if (!isActive() || shouldBreakIoUringInReady(config())) {
745                 return;
746             }
747             pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, allowMultiShotPollIn());
748         }
749 
750         private void readComplete(byte op, int res, int flags, short data) {
751             assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
752 
753             boolean multishot = numOutstandingReads == -1;
754             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
755             if (rearm) {
756                 // Reset READ_SCHEDULED if there is nothing more to handle and so we need to re-arm. This works for
757                 // multi-shot and non multi-shot variants.
758                 ioState &= ~READ_SCHEDULED;
759             }
760             boolean pending = readPending;
761             if (multishot) {
762                 // Reset readPending so we can still keep track if we might need to cancel the multi-shot read or
763                 // not.
764                 readPending = false;
765             } else if (--numOutstandingReads == 0) {
766                 // We received all outstanding completions.
767                 readPending = false;
768                 ioState &= ~READ_SCHEDULED;
769             }
770             inReadComplete = true;
771             try {
772                 socketIsEmpty = socketIsEmpty(flags);
773                 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
774                         (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
775                 readComplete0(op, res, flags, data, numOutstandingReads);
776             } finally {
777                 try {
778                     // Check if we should consider the read loop to be done.
779                     if (recvBufAllocHandle().isReadComplete()) {
780                         // Reset the handle as we are done with the read-loop.
781                         recvBufAllocHandle().reset(config());
782 
783                         // Check if this was a readComplete(...) triggered by a read or multi-shot read.
784                         if (!multishot) {
785                             if (readPending) {
786                                 // This was a "normal" read and the user did signal we should continue reading.
787                                 // Let's schedule the read now.
788                                 doBeginReadNow();
789                             }
790                         } else {
791                             // The readComplete(...) was triggered by a multi-shot read. Because of this the state
792                             // machine is a bit more complicated.
793 
794                             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
795                                 // The readComplete(...) was triggered because the previous read was cancelled.
796                                 // In this case we we need to check if the user did signal the desire to read again
797                                 // in the meantime. If this is the case we need to schedule the read to ensure
798                                 // we do not stall.
799                                 if (pending) {
800                                     doBeginReadNow();
801                                 }
802                             } else if (rearm) {
803                                 // We need to rearm the multishot as otherwise we might miss some data.
804                                 doBeginReadNow();
805                             } else if (!readPending) {
806                                 // Cancel the multi-shot read now as the user did not signal that we want to keep
807                                 // reading while we handle the completion event.
808                                 cancelOutstandingReads(registration, numOutstandingReads);
809                             }
810                         }
811                     } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
812                         // The readComplete(...) was triggered because the previous read was cancelled.
813                         // In this case we we need to check if the user did signal the desire to read again
814                         // in the meantime. If this is the case we need to schedule the read to ensure
815                         // we do not stall.
816                         if (pending) {
817                             doBeginReadNow();
818                         }
819                     } else if (multishot && rearm) {
820                         // We need to rearm the multishot as otherwise we might miss some data.
821                         doBeginReadNow();
822                     }
823                 } finally {
824                     inReadComplete = false;
825                     socketIsEmpty = false;
826                 }
827             }
828         }
829 
830         /**
831          * Called once a read was completed.
832          */
833         protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
834 
835         /**
836          * Called once POLLRDHUP event is ready to be processed
837          */
838         private void pollRdHup(int res) {
839             ioState &= ~POLL_RDHUP_SCHEDULED;
840             pollRdhupId = 0;
841             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
842                 return;
843             }
844 
845             // Mark that we received a POLLRDHUP and so need to continue reading until all the input ist drained.
846             recvBufAllocHandle().rdHupReceived();
847 
848             if (isActive()) {
849                 scheduleFirstReadIfNeeded();
850             } else {
851                 // Just to be safe make sure the input marked as closed.
852                 shutdownInput(false);
853             }
854         }
855 
856         /**
857          * Called once POLLIN event is ready to be processed
858          */
859         private void pollIn(int res, int flags, short data) {
860             // Check if we need to rearm. This works for both cases, POLL_ADD and POLL_ADD_MULTI.
861             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
862             if (rearm) {
863                 ioState &= ~POLL_IN_SCHEDULED;
864                 pollInId = 0;
865             }
866             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
867                 return;
868             }
869             if (!readPending) {
870                 // We received the POLLIN but the user is not interested yet in reading, just mark socketHasMoreData
871                 // as true so we will trigger a read directly once the user calls read()
872                 socketHasMoreData = true;
873                 return;
874             }
875             scheduleFirstReadIfNeeded();
876         }
877 
878         private void scheduleFirstReadIfNeeded() {
879             if ((ioState & READ_SCHEDULED) == 0) {
880                 scheduleFirstRead();
881             }
882         }
883 
884         private void scheduleFirstRead() {
885             // This is a new "read loop" so we need to reset the allocHandle.
886             final ChannelConfig config = config();
887             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
888             allocHandle.reset(config);
889             scheduleRead(true);
890         }
891 
892         protected final void scheduleRead(boolean first) {
893             // Only schedule another read if the fd is still open.
894             if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
895                 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
896                 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
897                     ioState |= READ_SCHEDULED;
898                 }
899             }
900         }
901 
902         /**
903          * Schedule a read and returns the number of {@link #readComplete(byte, int, int, short)}
904          * calls that are expected because of the scheduled read.
905          *
906          * @param first             {@code true} if this is the first read of a read loop.
907          * @param socketIsEmpty     {@code true} if the socket is guaranteed to be empty, {@code false} otherwise.
908          * @return                  the number of {@link #readComplete(byte, int, int, short)} calls expected or
909          *                          {@code -1} if {@link #readComplete(byte, int, int, short)} is called until
910          *                          the read is cancelled (multi-shot).
911          */
912         protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
913 
914         /**
915          * Called once POLLOUT event is ready to be processed
916          *
917          * @param res   the result.
918          */
919         private void pollOut(int res) {
920             ioState &= ~POLL_OUT_SCHEDULED;
921             pollOutId = 0;
922             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
923                 return;
924             }
925             // pending connect
926             if (connectPromise != null) {
927                 // Note this method is invoked by the event loop only if the connection attempt was
928                 // neither cancelled nor timed out.
929 
930                 assert eventLoop().inEventLoop();
931 
932                 boolean connectStillInProgress = false;
933                 try {
934                     boolean wasActive = isActive();
935                     if (!socket.finishConnect()) {
936                         connectStillInProgress = true;
937                         return;
938                     }
939                     fulfillConnectPromise(connectPromise, wasActive);
940                 } catch (Throwable t) {
941                     fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
942                 } finally {
943                     if (!connectStillInProgress) {
944                         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0
945                         // is used
946                         // See https://github.com/netty/netty/issues/1770
947                         cancelConnectTimeoutFuture();
948                         connectPromise = null;
949                     } else {
950                         // The connect was not done yet, register for POLLOUT again
951                         schedulePollOut();
952                     }
953                 }
954             } else if (!socket.isOutputShutdown()) {
955                 // Try writing again
956                 super.flush0();
957             }
958         }
959 
960         /**
961          * Called once a write was completed.
962          *
963          * @param op    the op code.
964          * @param res   the result.
965          * @param flags the flags.
966          * @param data  the data that was passed when submitting the op.
967          */
968         private void writeComplete(byte op, int res, int flags, short data) {
969             if ((ioState & CONNECT_SCHEDULED) != 0) {
970                 // The writeComplete(...) callback was called because of a sendmsg(...) result that was used for
971                 // TCP_FASTOPEN_CONNECT.
972                 freeMsgHdrArray();
973                 if (res > 0) {
974                     // Connect complete!
975                     outboundBuffer().removeBytes(res);
976 
977                     // Explicit pass in 0 as this is returned by a connect(...) call when it was successful.
978                     connectComplete(op, 0, flags, data);
979                 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
980                     // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
981                     // In this case, our TCP connection will be established normally, but no data was transmitted at
982                     // this time. We'll just transmit the data with normal writes later.
983                     // Let's submit a normal connect.
984                     submitConnect((InetSocketAddress) requestedRemoteAddress);
985                 } else {
986                     // There was an error, handle it as a normal connect error.
987                     connectComplete(op, res, flags, data);
988                 }
989                 return;
990             }
991 
992             if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
993                 assert numOutstandingWrites > 0;
994                 --numOutstandingWrites;
995             }
996 
997             boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
998             if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
999 
1000                 // We were not able to write everything, let's register for POLLOUT
1001                 schedulePollOut();
1002             }
1003 
1004             // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
1005             // while still removing messages internally in removeBytes(...) which then may corrupt state.
1006             if (numOutstandingWrites == 0) {
1007                 ioState &= ~WRITE_SCHEDULED;
1008 
1009                 // If we could write all and we did not schedule a pollout yet let us try to write again
1010                 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
1011                     scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
1012                 }
1013             }
1014         }
1015 
1016         /**
1017          * Called once a write was completed.
1018          * @param op            the op code
1019          * @param res           the result.
1020          * @param flags         the flags.
1021          * @param data          the data that was passed when submitting the op.
1022          * @param outstanding   the outstanding write completions.
1023          */
1024         abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1025 
1026         /**
1027          * Called once a cancel was completed.
1028          *
1029          * @param op            the op code
1030          * @param res           the result.
1031          * @param flags         the flags.
1032          * @param data          the data that was passed when submitting the op.
1033          */
1034         void cancelComplete0(byte op, int res, int flags, short data) {
1035             // NOOP
1036         }
1037 
1038         /**
1039          * Called once a connect was completed.
1040          * @param op            the op code.
1041          * @param res           the result.
1042          * @param flags         the flags.
1043          * @param data          the data that was passed when submitting the op.
1044          */
1045         void connectComplete(byte op, int res, int flags, short data) {
1046             ioState &= ~CONNECT_SCHEDULED;
1047             freeRemoteAddressMemory();
1048 
1049             if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1050                 // connect not complete yet need to wait for poll_out event
1051                 schedulePollOut();
1052             } else {
1053                 try {
1054                     if (res == 0) {
1055                         fulfillConnectPromise(connectPromise, active);
1056                         if (readPending) {
1057                             doBeginReadNow();
1058                         }
1059                     } else {
1060                         try {
1061                             Errors.throwConnectException("io_uring connect", res);
1062                         } catch (Throwable cause) {
1063                             fulfillConnectPromise(connectPromise, cause);
1064                         }
1065                     }
1066                 } finally {
1067                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is
1068                     // used
1069                     // See https://github.com/netty/netty/issues/1770
1070                     cancelConnectTimeoutFuture();
1071                     connectPromise = null;
1072                 }
1073             }
1074         }
1075 
1076         @Override
1077         public void connect(
1078                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1079             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
1080             // non-blocking io.
1081             if (promise.isDone() || !ensureOpen(promise)) {
1082                 return;
1083             }
1084 
1085             if (delayedClose != null) {
1086                 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1087                 return;
1088             }
1089             try {
1090                 if (connectPromise != null) {
1091                     throw new ConnectionPendingException();
1092                 }
1093                 if (localAddress instanceof InetSocketAddress) {
1094                     checkResolvable((InetSocketAddress) localAddress);
1095                 }
1096 
1097                 if (remoteAddress instanceof InetSocketAddress) {
1098                     checkResolvable((InetSocketAddress) remoteAddress);
1099                 }
1100 
1101                 if (remote != null) {
1102                     // Check if already connected before trying to connect. This is needed as connect(...) will not#
1103                     // return -1 and set errno to EISCONN if a previous connect(...) attempt was setting errno to
1104                     // EINPROGRESS and finished later.
1105                     throw new AlreadyConnectedException();
1106                 }
1107 
1108                 if (localAddress != null) {
1109                     socket.bind(localAddress);
1110                 }
1111 
1112                 if (remoteAddress instanceof InetSocketAddress) {
1113                     InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1114                     ByteBuf initialData = null;
1115                     if (IoUring.isTcpFastOpenClientSideAvailable() &&
1116                         config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1117                         ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1118                         outbound.addFlush();
1119                         Object curr;
1120                         if ((curr = outbound.current()) instanceof ByteBuf) {
1121                             initialData = (ByteBuf) curr;
1122                         }
1123                     }
1124                     if (initialData != null) {
1125                         msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1126                         MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1127                         hdr.set(socket, inetSocketAddress, IoUring.memoryAddress(initialData),
1128                                 initialData.readableBytes(), (short) 0);
1129 
1130                         int fd = fd().intValue();
1131                         IoRegistration registration = registration();
1132                         IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
1133                                 hdr.address(), hdr.idx());
1134                         connectId = registration.submit(ops);
1135                         if (connectId == 0) {
1136                             // Directly release the memory if submitting failed.
1137                             freeMsgHdrArray();
1138                         }
1139                     } else {
1140                         submitConnect(inetSocketAddress);
1141                     }
1142                 } else if (remoteAddress instanceof DomainSocketAddress) {
1143                     DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) remoteAddress;
1144                     submitConnect(unixDomainSocketAddress);
1145                 } else {
1146                     throw new Error("Unexpected SocketAddress implementation " + className(remoteAddress));
1147                 }
1148 
1149                 if (connectId != 0) {
1150                     ioState |= CONNECT_SCHEDULED;
1151                 }
1152             } catch (Throwable t) {
1153                 closeIfClosed();
1154                 promise.tryFailure(annotateConnectException(t, remoteAddress));
1155                 return;
1156             }
1157             connectPromise = promise;
1158             requestedRemoteAddress = remoteAddress;
1159             // Schedule connect timeout.
1160             int connectTimeoutMillis = config().getConnectTimeoutMillis();
1161             if (connectTimeoutMillis > 0) {
1162                 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1163                     @Override
1164                     public void run() {
1165                         ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1166                         if (connectPromise != null && !connectPromise.isDone() &&
1167                                 connectPromise.tryFailure(new ConnectTimeoutException(
1168                                         "connection timed out: " + remoteAddress))) {
1169                             close(voidPromise());
1170                         }
1171                     }
1172                 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1173             }
1174 
1175             promise.addListener(new ChannelFutureListener() {
1176                 @Override
1177                 public void operationComplete(ChannelFuture future) {
1178                     // If the connect future is cancelled we also cancel the timeout and close the
1179                     // underlying socket.
1180                     if (future.isCancelled()) {
1181                         cancelConnectTimeoutFuture();
1182                         connectPromise = null;
1183                         close(voidPromise());
1184                     }
1185                 }
1186             });
1187         }
1188 
1189         @Override
1190         public final void deregister(ChannelPromise promise) {
1191             if (deregisterPromise != null) {
1192                 // A deregistration is already in progress.
1193                 PromiseNotifier.cascade(deregisterPromise, promise);
1194             } else if (!isRegistered()) {
1195                 promise.setSuccess();
1196             } else {
1197                 // We need to store a reference to the original promise as we should only notify it once we
1198                 // have handles all pending completions.
1199                 deregisterPromise = promise;
1200                 super.deregister(newPromise().addListener(f -> {
1201                     if (!f.isSuccess()) {
1202                         this.deregisterPromise = null;
1203                         promise.setFailure(f.cause());
1204                     }
1205                 }));
1206             }
1207         }
1208     }
1209 
1210     private void submitConnect(InetSocketAddress inetSocketAddress) {
1211         cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1212         remoteAddressMemory = cleanable.buffer();
1213 
1214         SockaddrIn.set(socket.isIpv6(), remoteAddressMemory, inetSocketAddress);
1215 
1216         int fd = fd().intValue();
1217         IoRegistration registration = registration();
1218         IoUringIoOps ops = IoUringIoOps.newConnect(
1219                 fd, (byte) 0, Buffer.memoryAddress(remoteAddressMemory), nextOpsId());
1220         connectId = registration.submit(ops);
1221         if (connectId == 0) {
1222             // Directly release the memory if submitting failed.
1223             freeRemoteAddressMemory();
1224         }
1225     }
1226 
1227     private void submitConnect(DomainSocketAddress unixDomainSocketAddress) {
1228         cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_UN);
1229         remoteAddressMemory = cleanable.buffer();
1230         int addrLen = SockaddrIn.setUds(remoteAddressMemory, unixDomainSocketAddress);
1231         int fd = fd().intValue();
1232         IoRegistration registration = registration();
1233         long addr = Buffer.memoryAddress(remoteAddressMemory);
1234         IoUringIoOps ops = IoUringIoOps.newConnect(fd, (byte) 0, addr, addrLen, nextOpsId());
1235         connectId = registration.submit(ops);
1236         if (connectId == 0) {
1237             // Directly release the memory if submitting failed.
1238             freeRemoteAddressMemory();
1239         }
1240     }
1241 
1242     @Override
1243     protected Object filterOutboundMessage(Object msg) {
1244         if (msg instanceof ByteBuf) {
1245             ByteBuf buf = (ByteBuf) msg;
1246             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1247         }
1248         throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
1249     }
1250 
1251     @Override
1252     protected void doRegister(ChannelPromise promise) {
1253         IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1254         eventLoop.register(ioUringUnsafe()).addListener(f -> {
1255             if (f.isSuccess()) {
1256                 registration = (IoRegistration) f.getNow();
1257                 promise.setSuccess();
1258             } else {
1259                 promise.setFailure(f.cause());
1260             }
1261         });
1262     }
1263 
1264     @Override
1265     protected final void doDeregister() {
1266         // Cancel all previous submitted ops.
1267         if (!ioUringUnsafe().cancelOps(connectPromise != null)) {
1268             // It's possible that we never registered anything and so we did not submit any ASYNC_CANCEL.
1269             // In this case directly call cancel as we will not receive any completion at all.
1270             if (registration != null) {
1271                 registration.cancel();
1272             }
1273         }
1274     }
1275 
1276     @Override
1277     protected void doBind(final SocketAddress local) throws Exception {
1278         if (local instanceof InetSocketAddress) {
1279             checkResolvable((InetSocketAddress) local);
1280         }
1281         socket.bind(local);
1282         this.local = socket.localAddress();
1283     }
1284 
1285     protected static void checkResolvable(InetSocketAddress addr) {
1286         if (addr.isUnresolved()) {
1287             throw new UnresolvedAddressException();
1288         }
1289     }
1290 
1291     @Override
1292     protected final SocketAddress localAddress0() {
1293         return local;
1294     }
1295 
1296     @Override
1297     protected final SocketAddress remoteAddress0() {
1298         return remote;
1299     }
1300 
1301     private static boolean isAllowHalfClosure(ChannelConfig config) {
1302         return config instanceof SocketChannelConfig &&
1303                ((SocketChannelConfig) config).isAllowHalfClosure();
1304     }
1305 
1306     private void cancelConnectTimeoutFuture() {
1307         if (connectTimeoutFuture != null) {
1308             connectTimeoutFuture.cancel(false);
1309             connectTimeoutFuture = null;
1310         }
1311     }
1312 
1313     private void computeRemote() {
1314         if (requestedRemoteAddress instanceof InetSocketAddress) {
1315             remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1316         }
1317     }
1318 
1319     private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1320         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1321     }
1322 
1323     /**
1324      * Return if the socket is guaranteed to be empty when the submitted io was executed and the completion event be
1325      * created.
1326      * @param flags     the flags that were part of the completion
1327      * @return          {@code true} if empty.
1328      */
1329     protected abstract boolean socketIsEmpty(int flags);
1330 
1331     abstract boolean isPollInFirst();
1332 }