View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufHolder;
21  import io.netty.buffer.ByteBufUtil;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractChannel;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelOption;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.socket.ChannelInputShutdownEvent;
38  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39  import io.netty.channel.socket.SocketChannelConfig;
40  import io.netty.channel.unix.Buffer;
41  import io.netty.channel.unix.Errors;
42  import io.netty.channel.unix.FileDescriptor;
43  import io.netty.channel.unix.UnixChannel;
44  import io.netty.channel.unix.UnixChannelUtil;
45  import io.netty.util.ReferenceCountUtil;
46  import io.netty.util.concurrent.PromiseNotifier;
47  import io.netty.util.internal.logging.InternalLogger;
48  import io.netty.util.internal.logging.InternalLoggerFactory;
49  
50  import java.io.IOException;
51  import java.net.InetSocketAddress;
52  import java.net.SocketAddress;
53  import java.nio.ByteBuffer;
54  import java.nio.channels.AlreadyConnectedException;
55  import java.nio.channels.ClosedChannelException;
56  import java.nio.channels.ConnectionPendingException;
57  import java.nio.channels.NotYetConnectedException;
58  import java.nio.channels.UnresolvedAddressException;
59  import java.util.concurrent.ScheduledFuture;
60  import java.util.concurrent.TimeUnit;
61  
62  import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
63  import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
64  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
65  import static io.netty.util.internal.ObjectUtil.checkNotNull;
66  
67  
68  abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
69      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
70      final LinuxSocket socket;
71      protected volatile boolean active;
72  
73      // Different masks for outstanding I/O operations.
74      private static final int POLL_IN_SCHEDULED = 1;
75      private static final int POLL_OUT_SCHEDULED = 1 << 2;
76      private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
77      private static final int WRITE_SCHEDULED = 1 << 4;
78      private static final int READ_SCHEDULED = 1 << 5;
79      private static final int CONNECT_SCHEDULED = 1 << 6;
80  
81      private short opsId = Short.MIN_VALUE;
82  
83      private long pollInId;
84      private long pollOutId;
85      private long pollRdhupId;
86      private long connectId;
87  
88      // A byte is enough for now.
89      private byte ioState;
90  
91      // It's possible that multiple read / writes are issued. We need to keep track of these.
92      // Let's limit the amount of pending writes and reads by Short.MAX_VALUE. Maybe Byte.MAX_VALUE would also be good
93      // enough but let's be a bit more flexible for now.
94      private short numOutstandingWrites;
95      private short numOutstandingReads;
96  
97      private boolean readPending;
98      private boolean inReadComplete;
99  
100     private ChannelPromise delayedClose;
101     private boolean inputClosedSeenErrorOnRead;
102 
103     /**
104      * The future of the current connection attempt.  If not null, subsequent connection attempts will fail.
105      */
106     private ChannelPromise connectPromise;
107     private ScheduledFuture<?> connectTimeoutFuture;
108     private SocketAddress requestedRemoteAddress;
109     private ByteBuffer remoteAddressMemory;
110     private MsgHdrMemoryArray msgHdrMemoryArray;
111 
112     private IoUringIoRegistration registration;
113 
114     private volatile SocketAddress local;
115     private volatile SocketAddress remote;
116 
117     AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
118         super(parent);
119         this.socket = checkNotNull(socket, "fd");
120 
121         if (active) {
122             // Directly cache the remote and local addresses
123             // See https://github.com/netty/netty/issues/2359
124             this.active = true;
125             this.local = socket.localAddress();
126             this.remote = socket.remoteAddress();
127         }
128 
129         if (parent != null) {
130             logger.trace("Create Channel Socket: {}", socket.intValue());
131         } else {
132             logger.trace("Create Server Socket: {}", socket.intValue());
133         }
134     }
135 
136     AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
137         super(parent);
138         this.socket = checkNotNull(fd, "fd");
139         this.active = true;
140 
141         // Directly cache the remote and local addresses
142         // See https://github.com/netty/netty/issues/2359
143         this.remote = remote;
144         this.local = fd.localAddress();
145     }
146 
147     /**
148      * Returns the next id that should be used when submitting {@link IoUringIoOps}.
149      *
150      * @return  opsId
151      */
152     protected final short nextOpsId() {
153         short id = opsId++;
154 
155         // We use 0 for "none".
156         if (id == 0) {
157             id = opsId++;
158         }
159         return id;
160     }
161 
162     public final boolean isOpen() {
163         return socket.isOpen();
164     }
165 
166     @Override
167     public boolean isActive() {
168         return active;
169     }
170 
171     @Override
172     public final FileDescriptor fd() {
173         return socket;
174     }
175 
176     private AbstractUringUnsafe ioUringUnsafe() {
177         return (AbstractUringUnsafe) unsafe();
178     }
179 
180     @Override
181     protected boolean isCompatible(final EventLoop loop) {
182         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
183     }
184 
185     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
186         return newDirectBuffer(buf, buf);
187     }
188 
189     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
190         final int readableBytes = buf.readableBytes();
191         if (readableBytes == 0) {
192             ReferenceCountUtil.release(holder);
193             return Unpooled.EMPTY_BUFFER;
194         }
195 
196         final ByteBufAllocator alloc = alloc();
197         if (alloc.isDirectBufferPooled()) {
198             return newDirectBuffer0(holder, buf, alloc, readableBytes);
199         }
200 
201         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
202         if (directBuf == null) {
203             return newDirectBuffer0(holder, buf, alloc, readableBytes);
204         }
205 
206         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
207         ReferenceCountUtil.safeRelease(holder);
208         return directBuf;
209     }
210 
211     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
212         final ByteBuf directBuf = alloc.directBuffer(capacity);
213         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
214         ReferenceCountUtil.safeRelease(holder);
215         return directBuf;
216     }
217 
218     /**
219      * Cancel all outstanding reads
220      *
221      * @param registration          the {@link IoUringIoRegistration}.
222      * @param numOutstandingReads   the number of outstanding reads.
223      */
224     protected abstract void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads);
225 
226     /**
227      * Cancel all outstanding writes
228      *
229      * @param registration          the {@link IoUringIoRegistration}.
230      * @param numOutstandingWrites   the number of outstanding writes.
231      */
232     protected abstract void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites);
233 
234     @Override
235     protected void doDisconnect() throws Exception {
236     }
237 
238     private void freeRemoteAddressMemory() {
239         if (remoteAddressMemory != null) {
240             Buffer.free(remoteAddressMemory);
241             remoteAddressMemory = null;
242         }
243     }
244 
245     private void freeMsgHdrArray() {
246         if (msgHdrMemoryArray != null) {
247             msgHdrMemoryArray.release();
248             msgHdrMemoryArray = null;
249         }
250     }
251 
252     @Override
253     protected void doClose() throws Exception {
254         active = false;
255 
256         if (registration != null) {
257             if (socket.markClosed()) {
258                 int fd = fd().intValue();
259                 IoUringIoOps ops = IoUringIoOps.newClose(fd, 0, nextOpsId());
260                 registration.submit(ops);
261             }
262         } else {
263             // This one was never registered just use a syscall to close.
264             socket.close();
265             freeRemoteAddressMemory();
266             freeMsgHdrArray();
267         }
268     }
269 
270     @Override
271     protected final void doBeginRead() {
272         if (inputClosedSeenErrorOnRead) {
273             // We did see an error while reading and so closed the input. Stop reading.
274             return;
275         }
276         if (readPending) {
277             // We already have a read pending.
278             return;
279         }
280         if (inReadComplete) {
281             // We are currently in the readComplete(...) callback which might issue more reads by itself. Just
282             // mark it as a pending read. If readComplete(...) will not issue more reads itself it will pick up
283             // the readPending flag, reset it and call doBeginReadNow().
284             readPending = true;
285             return;
286         }
287         doBeginReadNow();
288     }
289 
290     private void doBeginReadNow() {
291         if (socket.isBlocking()) {
292             // If the socket is blocking we will directly call scheduleFirstReadIfNeeded() as we can use FASTPOLL.
293             ioUringUnsafe().scheduleFirstReadIfNeeded();
294         } else {
295             if ((ioState & POLL_IN_SCHEDULED) == 0) {
296                 ioUringUnsafe().schedulePollIn();
297             }
298         }
299     }
300 
301     @Override
302     protected void doWrite(ChannelOutboundBuffer in) {
303         if ((ioState & WRITE_SCHEDULED) != 0) {
304             return;
305         }
306         if (scheduleWrite(in) > 0) {
307             ioState |= WRITE_SCHEDULED;
308         }
309     }
310 
311     private int scheduleWrite(ChannelOutboundBuffer in) {
312         if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
313             return 0;
314         }
315         if (in == null) {
316             return 0;
317         }
318 
319         int msgCount = in.size();
320         if (msgCount == 0) {
321             return 0;
322         }
323         Object msg = in.current();
324 
325         if (msgCount > 1) {
326             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
327         } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
328                     (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
329             // We also need some special handling for CompositeByteBuf
330             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
331         } else {
332             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
333         }
334         // Ensure we never overflow
335         assert numOutstandingWrites > 0;
336         return numOutstandingWrites;
337     }
338 
339     protected final IoUringIoRegistration registration() {
340         assert registration != null;
341         return registration;
342     }
343 
344     private void schedulePollOut() {
345         // This should only be done if the socket is non-blocking.
346         assert !socket.isBlocking();
347 
348         assert (ioState & POLL_OUT_SCHEDULED) == 0;
349         int fd = fd().intValue();
350         IoUringIoRegistration registration = registration();
351         IoUringIoOps ops = IoUringIoOps.newPollAdd(
352                 fd, 0, Native.POLLOUT, (short) Native.POLLOUT);
353         pollOutId = registration.submit(ops);
354         ioState |= POLL_OUT_SCHEDULED;
355     }
356 
357     final void schedulePollRdHup() {
358         assert (ioState & POLL_RDHUP_SCHEDULED) == 0;
359         int fd = fd().intValue();
360         IoUringIoRegistration registration = registration();
361         IoUringIoOps ops = IoUringIoOps.newPollAdd(
362                 fd, 0, Native.POLLRDHUP, (short) Native.POLLRDHUP);
363         pollRdhupId = registration.submit(ops);
364         ioState |= POLL_RDHUP_SCHEDULED;
365     }
366 
367     final void resetCachedAddresses() {
368         local = socket.localAddress();
369         remote = socket.remoteAddress();
370     }
371 
372     abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
373         private IoUringRecvByteAllocatorHandle allocHandle;
374 
375         /**
376          * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer} and returns the number of
377          * {@link #writeComplete(int, int, short)} calls that are expected because of the scheduled write.
378          */
379         protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
380 
381         /**
382          * Schedule the write of a single message and returns the number of {@link #writeComplete(int, int, short)}
383          * calls that are expected because of the scheduled write
384          */
385         protected abstract int scheduleWriteSingle(Object msg);
386 
387         private boolean closed;
388 
389         @Override
390         public final void handle(IoRegistration registration, IoEvent ioEvent) {
391             IoUringIoRegistration reg = (IoUringIoRegistration) registration;
392             IoUringIoEvent event = (IoUringIoEvent) ioEvent;
393             byte op = event.opcode();
394             int res = event.res();
395             int flags = event.flags();
396             short data = event.data();
397             switch (op) {
398                 case Native.IORING_OP_RECV:
399                 case Native.IORING_OP_ACCEPT:
400                 case Native.IORING_OP_RECVMSG:
401                 case Native.IORING_OP_READ:
402                     readComplete(res, flags, data);
403 
404                     // We delay the actual close if there is still a write or read scheduled, let's see if there
405                     // was a close that needs to be done now.
406                     handleDelayedClosed();
407                     break;
408                 case Native.IORING_OP_WRITEV:
409                 case Native.IORING_OP_SEND:
410                 case Native.IORING_OP_SENDMSG:
411                 case Native.IORING_OP_WRITE:
412                     writeComplete(res, flags, data);
413 
414                     // We delay the actual close if there is still a write or read scheduled, let's see if there
415                     // was a close that needs to be done now.
416                     handleDelayedClosed();
417                     break;
418                 case Native.IORING_OP_POLL_ADD:
419                     pollAddComplete(res, flags, data);
420                     break;
421                 case Native.IORING_OP_POLL_REMOVE:
422                     // fd reuse can replace a closed channel (with pending POLL_REMOVE CQEs)
423                     // with a new one: better not making it to mess-up with its state!
424                     if (res == 0) {
425                         clearPollFlag(data);
426                     }
427                     break;
428                 case Native.IORING_OP_ASYNC_CANCEL:
429                     cancelComplete0(res, flags, data);
430                     break;
431                 case Native.IORING_OP_CONNECT:
432                     connectComplete(res, flags, data);
433 
434                     // once the connect was completed we can also free some resources that are not needed anymore.
435                     freeMsgHdrArray();
436                     freeRemoteAddressMemory();
437                     break;
438                 case Native.IORING_OP_CLOSE:
439                     if (delayedClose != null) {
440                         delayedClose.setSuccess();
441                     }
442                     closed = true;
443                     break;
444             }
445 
446             if (ioState == 0 && closed) {
447                 freeResourcesNow(reg);
448             }
449         }
450 
451         /**
452          * Free all resources now. No new IO will be submitted for this channel via io_uring
453          *
454          * @param reg   the {@link IoUringIoRegistration}.
455          */
456         protected void freeResourcesNow(IoUringIoRegistration reg) {
457             freeMsgHdrArray();
458             freeRemoteAddressMemory();
459             reg.cancel();
460         }
461 
462         private void handleDelayedClosed() {
463             if (delayedClose != null && canCloseNow()) {
464                 closeNow(newPromise());
465             }
466         }
467 
468         private void pollAddComplete(int res, int flags, short data) {
469             if ((data & Native.POLLOUT) != 0) {
470                 pollOut(res);
471             }
472             if ((data & Native.POLLIN) != 0) {
473                 pollIn(res);
474             }
475             if ((data & Native.POLLRDHUP) != 0) {
476                 pollRdHup(res);
477             }
478         }
479 
480         @Override
481         public final void close() throws Exception {
482             close(voidPromise());
483         }
484 
485         @Override
486         public final void close(ChannelPromise promise) {
487             if (delayedClose == null) {
488                 // We have a write operation pending that should be completed asap.
489                 // We will do the actual close operation one this write result is returned as otherwise
490                 // we may get into trouble as we may close the fd while we did not process the write yet.
491                 delayedClose = promise.isVoid() ? newPromise() : promise;
492             } else {
493                 delayedClose.addListener(new PromiseNotifier<>(false, promise));
494                 return;
495             }
496 
497             boolean cancelConnect = false;
498             try {
499                 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
500                 if (connectPromise != null) {
501                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
502                     connectPromise.tryFailure(new ClosedChannelException());
503                     AbstractIoUringChannel.this.connectPromise = null;
504                     cancelConnect = true;
505                 }
506 
507                 cancelConnectTimeoutFuture();
508             } finally {
509                 // It's important we cancel all outstanding connect, write and read operations now so
510                 // we will be able to process a delayed close if needed.
511                 if (registration != null) {
512                     if (cancelConnect && connectId != 0) {
513                         int fd = fd().intValue();
514                         // Best effort to cancel the already submitted connect request.
515                         registration.submit(IoUringIoOps.newAsyncCancel(
516                                 fd, 0, connectId, Native.IORING_OP_CONNECT));
517                     }
518                     cancelOutstandingReads(registration, numOutstandingReads);
519                     cancelOutstandingWrites(registration, numOutstandingWrites);
520                 }
521             }
522 
523             if (canCloseNow()) {
524                 // Currently there are is no WRITE and READ scheduled so we can start to teardown the channel.
525                 closeNow(newPromise());
526             }
527         }
528 
529         private boolean canCloseNow() {
530             // Currently there are is no WRITE and READ scheduled, we can close the channel now without
531             // problems related to re-ordering of completions.
532             return (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
533         }
534 
535         private void closeNow(ChannelPromise promise) {
536             super.close(promise);
537         }
538 
539         @Override
540         protected final void flush0() {
541             // Flush immediately only when there's no pending flush.
542             // If there's a pending flush operation, event loop will call forceFlush() later,
543             // and thus there's no need to call it now.
544             if ((ioState & POLL_OUT_SCHEDULED) == 0) {
545                 super.flush0();
546             }
547         }
548 
549         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
550             if (promise == null) {
551                 // Closed via cancellation and the promise has been notified already.
552                 return;
553             }
554 
555             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
556             promise.tryFailure(cause);
557             closeIfClosed();
558         }
559 
560         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
561             if (promise == null) {
562                 // Closed via cancellation and the promise has been notified already.
563                 return;
564             }
565             active = true;
566 
567             if (local == null) {
568                 local = socket.localAddress();
569             }
570             computeRemote();
571 
572             // Register POLLRDHUP
573             schedulePollRdHup();
574 
575             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
576             // We still need to ensure we call fireChannelActive() in this case.
577             boolean active = isActive();
578 
579             // trySuccess() will return false if a user cancelled the connection attempt.
580             boolean promiseSet = promise.trySuccess();
581 
582             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
583             // because what happened is what happened.
584             if (!wasActive && active) {
585                 pipeline().fireChannelActive();
586             }
587 
588             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
589             if (!promiseSet) {
590                 close(voidPromise());
591             }
592         }
593 
594         @Override
595         public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
596             if (allocHandle == null) {
597                 allocHandle = new IoUringRecvByteAllocatorHandle(
598                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
599             }
600             return allocHandle;
601         }
602 
603         final void shutdownInput(boolean rdHup) {
604             logger.trace("shutdownInput Fd: {}", fd().intValue());
605             if (!socket.isInputShutdown()) {
606                 if (isAllowHalfClosure(config())) {
607                     try {
608                         socket.shutdown(true, false);
609                     } catch (IOException ignored) {
610                         // We attempted to shutdown and failed, which means the input has already effectively been
611                         // shutdown.
612                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
613                         return;
614                     } catch (NotYetConnectedException ignore) {
615                         // We attempted to shutdown and failed, which means the input has already effectively been
616                         // shutdown.
617                     }
618                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
619                 } else {
620                     close(voidPromise());
621                 }
622             } else if (!rdHup) {
623                 inputClosedSeenErrorOnRead = true;
624                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
625             }
626         }
627 
628         private void fireEventAndClose(Object evt) {
629             pipeline().fireUserEventTriggered(evt);
630             close(voidPromise());
631         }
632 
633         final void schedulePollIn() {
634             // This should only be done if the socket is non-blocking.
635             assert !socket.isBlocking();
636 
637             assert (ioState & POLL_IN_SCHEDULED) == 0;
638             if (!isActive() || shouldBreakIoUringInReady(config())) {
639                 return;
640             }
641             int fd = fd().intValue();
642             IoUringIoRegistration registration = registration();
643             IoUringIoOps ops = IoUringIoOps.newPollAdd(
644                     fd, 0, Native.POLLIN, (short) Native.POLLIN);
645             pollInId = registration.submit(ops);
646             ioState |= POLL_IN_SCHEDULED;
647         }
648 
649         private void readComplete(int res, int flags, int data) {
650             assert numOutstandingReads > 0;
651             if (--numOutstandingReads == 0) {
652                 readPending = false;
653                 ioState &= ~READ_SCHEDULED;
654             }
655             inReadComplete = true;
656             try {
657                 readComplete0(res, flags, data, numOutstandingReads);
658             } finally {
659                 inReadComplete = false;
660                 // There is a pending read and readComplete0(...) also did stop issue read request.
661                 // Let's trigger the requested read now.
662                 if (readPending && recvBufAllocHandle().isReadComplete()) {
663                     doBeginReadNow();
664                 }
665             }
666         }
667 
668         /**
669          * Called once a read was completed.
670          */
671         protected abstract void readComplete0(int res, int flags, int data, int outstandingCompletes);
672 
673         /**
674          * Called once POLLRDHUP event is ready to be processed
675          */
676         private void pollRdHup(int res) {
677             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
678                 return;
679             }
680             ioState &= ~POLL_RDHUP_SCHEDULED;
681 
682             // Mark that we received a POLLRDHUP and so need to continue reading until all the input ist drained.
683             recvBufAllocHandle().rdHupReceived();
684 
685             if (isActive()) {
686                 scheduleFirstReadIfNeeded();
687             } else {
688                 // Just to be safe make sure the input marked as closed.
689                 shutdownInput(true);
690             }
691         }
692 
693         /**
694          * Called once POLLIN event is ready to be processed
695          */
696         private void pollIn(int res) {
697             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
698                 return;
699             }
700             ioState &= ~POLL_IN_SCHEDULED;
701 
702             scheduleFirstReadIfNeeded();
703         }
704 
705         private void scheduleFirstReadIfNeeded() {
706             if ((ioState & READ_SCHEDULED) == 0) {
707                 scheduleFirstRead();
708             }
709         }
710 
711         private void scheduleFirstRead() {
712             // This is a new "read loop" so we need to reset the allocHandle.
713             final ChannelConfig config = config();
714             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
715             allocHandle.reset(config);
716             scheduleRead(true);
717         }
718 
719         protected final void scheduleRead(boolean first) {
720             // Only schedule another read if the fd is still open.
721             if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
722                 numOutstandingReads = (short) scheduleRead0(first);
723                 if (numOutstandingReads > 0) {
724                     ioState |= READ_SCHEDULED;
725                 }
726             }
727         }
728 
729         /**
730          * Schedule a read and returns the number of {@link #readComplete(int, int, int)} calls that are expected
731          * because of the scheduled read.
732          *
733          * @param first {@code true} if this is the first read of a read loop.
734          */
735         protected abstract int scheduleRead0(boolean first);
736 
737         /**
738          * Called once POLLOUT event is ready to be processed
739          *
740          * @param res   the result.
741          */
742         private void pollOut(int res) {
743             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
744                 return;
745             }
746             ioState &= ~POLL_OUT_SCHEDULED;
747             // pending connect
748             if (connectPromise != null) {
749                 // Note this method is invoked by the event loop only if the connection attempt was
750                 // neither cancelled nor timed out.
751 
752                 assert eventLoop().inEventLoop();
753 
754                 boolean connectStillInProgress = false;
755                 try {
756                     boolean wasActive = isActive();
757                     if (!socket.finishConnect()) {
758                         connectStillInProgress = true;
759                         return;
760                     }
761                     fulfillConnectPromise(connectPromise, wasActive);
762                 } catch (Throwable t) {
763                     fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
764                 } finally {
765                     if (!connectStillInProgress) {
766                         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0
767                         // is used
768                         // See https://github.com/netty/netty/issues/1770
769                         cancelConnectTimeoutFuture();
770                         connectPromise = null;
771                     } else {
772                         // This should only happen if the socket is non-blocking.
773                         assert !socket.isBlocking();
774 
775                         // The connect was not done yet, register for POLLOUT again
776                         schedulePollOut();
777                     }
778                 }
779             } else if (!socket.isOutputShutdown()) {
780                 // Try writing again
781                 super.flush0();
782             }
783         }
784 
785         /**
786          * Called once a write was completed.
787          *
788          * @param res   the result.
789          * @param flags the flags.
790          * @param data  the data that was passed when submitting the op.
791          */
792         private void writeComplete(int res, int flags, short data) {
793             if ((ioState & CONNECT_SCHEDULED) != 0) {
794                 // The writeComplete(...) callback was called because of a sendmsg(...) result that was used for
795                 // TCP_FASTOPEN_CONNECT.
796                 freeMsgHdrArray();
797                 if (res > 0) {
798                     // Connect complete!
799                     outboundBuffer().removeBytes(res);
800 
801                     // Explicit pass in 0 as this is returned by a connect(...) call when it was successful.
802                     connectComplete(0, flags, data);
803                 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
804                     // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
805                     // In this case, our TCP connection will be established normally, but no data was transmitted at
806                     // this time. We'll just transmit the data with normal writes later.
807                     // Let's submit a normal connect.
808                     submitConnect((InetSocketAddress) requestedRemoteAddress);
809                 } else {
810                     // There was an error, handle it as a normal connect error.
811                     connectComplete(res, flags, data);
812                 }
813                 return;
814             }
815             assert numOutstandingWrites > 0;
816             --numOutstandingWrites;
817 
818             boolean writtenAll = writeComplete0(res, flags, data, numOutstandingWrites);
819             if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
820                 // This should only happen if the socket is non-blocking.
821                 assert !socket.isBlocking();
822 
823                 // We were not able to write everything, let's register for POLLOUT
824                 schedulePollOut();
825             }
826 
827             // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
828             // while still removing messages internally in removeBytes(...) which then may corrupt state.
829             if (numOutstandingWrites == 0) {
830                 ioState &= ~WRITE_SCHEDULED;
831 
832                 // If we could write all and we did not schedule a pollout yet let us try to write again
833                 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
834                     doWrite(unsafe().outboundBuffer());
835                 }
836             }
837         }
838 
839         /**
840          * Called once a write was completed.
841          *
842          * @param res           the result.
843          * @param flags         the flags.
844          * @param data          the data that was passed when submitting the op.
845          * @param outstanding   the outstanding write completions.
846          */
847         abstract boolean writeComplete0(int res, int flags, int data, int outstanding);
848 
849         /**
850          * Called once a cancel was completed.
851          *
852          * @param res           the result.
853          * @param flags         the flags.
854          * @param data          the data that was passed when submitting the op.
855          */
856         void cancelComplete0(int res, int flags, short data) {
857             // NOOP
858         }
859 
860         /**
861          * Called once a connect was completed.
862          *
863          * @param res           the result.
864          * @param flags         the flags.
865          * @param data          the data that was passed when submitting the op.
866          */
867         void connectComplete(int res, int flags, short data) {
868             ioState &= ~CONNECT_SCHEDULED;
869             freeRemoteAddressMemory();
870 
871             if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
872                 // This should only happen if the socket is non-blocking.
873                 assert !socket.isBlocking();
874 
875                 // connect not complete yet need to wait for poll_out event
876                 schedulePollOut();
877             } else {
878                 try {
879                     if (res == 0) {
880                         fulfillConnectPromise(connectPromise, active);
881                     } else if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
882                         try {
883                             Errors.throwConnectException("io_uring connect", res);
884                         } catch (Throwable cause) {
885                             fulfillConnectPromise(connectPromise, cause);
886                         }
887                     }
888                 } finally {
889                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is
890                     // used
891                     // See https://github.com/netty/netty/issues/1770
892                     cancelConnectTimeoutFuture();
893                     connectPromise = null;
894                 }
895             }
896         }
897 
898         @Override
899         public void connect(
900                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
901             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
902             // non-blocking io.
903             if (promise.isDone() || !ensureOpen(promise)) {
904                 return;
905             }
906 
907             if (delayedClose != null) {
908                 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
909                 return;
910             }
911             try {
912                 if (connectPromise != null) {
913                     throw new ConnectionPendingException();
914                 }
915                 if (localAddress instanceof InetSocketAddress) {
916                     checkResolvable((InetSocketAddress) localAddress);
917                 }
918 
919                 if (remoteAddress instanceof InetSocketAddress) {
920                     checkResolvable((InetSocketAddress) remoteAddress);
921                 }
922 
923                 if (remote != null) {
924                     // Check if already connected before trying to connect. This is needed as connect(...) will not#
925                     // return -1 and set errno to EISCONN if a previous connect(...) attempt was setting errno to
926                     // EINPROGRESS and finished later.
927                     throw new AlreadyConnectedException();
928                 }
929 
930                 if (localAddress != null) {
931                     socket.bind(localAddress);
932                 }
933 
934                 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
935 
936                 ByteBuf initialData = null;
937                 if (IoUring.isTcpFastOpenClientSideAvailable() &&
938                         config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
939                     ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
940                     outbound.addFlush();
941                     Object curr;
942                     if ((curr = outbound.current()) instanceof ByteBuf) {
943                         initialData = (ByteBuf) curr;
944                     }
945                 }
946                 if (initialData != null) {
947                     msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
948                     MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
949                     hdr.write(socket, inetSocketAddress, initialData.memoryAddress(),
950                             initialData.readableBytes(), (short) 0);
951 
952                     int fd = fd().intValue();
953                     IoUringIoRegistration registration = registration();
954                     IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, 0, Native.MSG_FASTOPEN,
955                             hdr.address(), hdr.idx());
956                     connectId = registration.submit(ops);
957                 } else {
958                     submitConnect(inetSocketAddress);
959                 }
960 
961                 ioState |= CONNECT_SCHEDULED;
962             } catch (Throwable t) {
963                 closeIfClosed();
964                 promise.tryFailure(annotateConnectException(t, remoteAddress));
965                 return;
966             }
967             connectPromise = promise;
968             requestedRemoteAddress = remoteAddress;
969             // Schedule connect timeout.
970             int connectTimeoutMillis = config().getConnectTimeoutMillis();
971             if (connectTimeoutMillis > 0) {
972                 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
973                     @Override
974                     public void run() {
975                         ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
976                         if (connectPromise != null && !connectPromise.isDone() &&
977                                 connectPromise.tryFailure(new ConnectTimeoutException(
978                                         "connection timed out: " + remoteAddress))) {
979                             close(voidPromise());
980                         }
981                     }
982                 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
983             }
984 
985             promise.addListener(new ChannelFutureListener() {
986                 @Override
987                 public void operationComplete(ChannelFuture future) {
988                     // If the connect future is cancelled we also cancel the timeout and close the
989                     // underlying socket.
990                     if (future.isCancelled()) {
991                         cancelConnectTimeoutFuture();
992                         connectPromise = null;
993                         close(voidPromise());
994                     }
995                 }
996             });
997         }
998     }
999 
1000     private void submitConnect(InetSocketAddress inetSocketAddress) {
1001         remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1002         long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
1003 
1004         SockaddrIn.write(socket.isIpv6(), remoteAddressMemoryAddress, inetSocketAddress);
1005 
1006         int fd = fd().intValue();
1007         IoUringIoRegistration registration = registration();
1008         IoUringIoOps ops = IoUringIoOps.newConnect(
1009                 fd, 0, remoteAddressMemoryAddress, nextOpsId());
1010         connectId = registration.submit(ops);
1011     }
1012 
1013     @Override
1014     protected Object filterOutboundMessage(Object msg) {
1015         if (msg instanceof ByteBuf) {
1016             ByteBuf buf = (ByteBuf) msg;
1017             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1018         }
1019 
1020         throw new UnsupportedOperationException("unsupported message type");
1021     }
1022 
1023     @Override
1024     protected void doRegister(ChannelPromise promise) {
1025         IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1026         eventLoop.register(ioUringUnsafe()).addListener(f -> {
1027             if (f.isSuccess()) {
1028                 registration = (IoUringIoRegistration) f.getNow();
1029                 promise.setSuccess();
1030             } else {
1031                 promise.setFailure(f.cause());
1032             }
1033         });
1034     }
1035 
1036     @Override
1037     protected final void doDeregister() {
1038         if (registration == null) {
1039             return;
1040         }
1041         int fd = fd().intValue();
1042         if ((ioState & POLL_RDHUP_SCHEDULED) != 0) {
1043             registration.submit(IoUringIoOps.newPollRemove(
1044                     fd, 0, pollRdhupId, (short) Native.POLLRDHUP));
1045         }
1046         if ((ioState & POLL_IN_SCHEDULED) != 0) {
1047             registration.submit(IoUringIoOps.newPollRemove(
1048                     fd, 0, pollInId, (short) Native.POLLIN));
1049         }
1050         if ((ioState & POLL_OUT_SCHEDULED) != 0) {
1051             registration.submit(IoUringIoOps.newPollRemove(
1052                     fd,  0, pollOutId, (short) Native.POLLOUT));
1053         }
1054 
1055         registration = null;
1056     }
1057 
1058     @Override
1059     protected void doBind(final SocketAddress local) throws Exception {
1060         if (local instanceof InetSocketAddress) {
1061             checkResolvable((InetSocketAddress) local);
1062         }
1063         socket.bind(local);
1064         this.local = socket.localAddress();
1065     }
1066 
1067     protected static void checkResolvable(InetSocketAddress addr) {
1068         if (addr.isUnresolved()) {
1069             throw new UnresolvedAddressException();
1070         }
1071     }
1072 
1073     @Override
1074     protected final SocketAddress localAddress0() {
1075         return local;
1076     }
1077 
1078     @Override
1079     protected final SocketAddress remoteAddress0() {
1080         return remote;
1081     }
1082 
1083     private static boolean isAllowHalfClosure(ChannelConfig config) {
1084         return config instanceof SocketChannelConfig &&
1085                ((SocketChannelConfig) config).isAllowHalfClosure();
1086     }
1087 
1088     private void cancelConnectTimeoutFuture() {
1089         if (connectTimeoutFuture != null) {
1090             connectTimeoutFuture.cancel(false);
1091             connectTimeoutFuture = null;
1092         }
1093     }
1094 
1095     private void computeRemote() {
1096         if (requestedRemoteAddress instanceof InetSocketAddress) {
1097             remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1098         }
1099     }
1100 
1101     private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1102         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1103     }
1104 
1105     private void clearPollFlag(int pollMask) {
1106         if ((pollMask & Native.POLLIN) != 0) {
1107             ioState &= ~POLL_IN_SCHEDULED;
1108         }
1109         if ((pollMask & Native.POLLOUT) != 0) {
1110             ioState &= ~POLL_OUT_SCHEDULED;
1111         }
1112         if ((pollMask & Native.POLLRDHUP) != 0) {
1113             ioState &= ~POLL_RDHUP_SCHEDULED;
1114         }
1115      }
1116 }