View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufHolder;
21  import io.netty.buffer.ByteBufUtil;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractChannel;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelOption;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.ServerChannel;
38  import io.netty.channel.socket.ChannelInputShutdownEvent;
39  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40  import io.netty.channel.socket.SocketChannelConfig;
41  import io.netty.channel.unix.Buffer;
42  import io.netty.channel.unix.Errors;
43  import io.netty.channel.unix.FileDescriptor;
44  import io.netty.channel.unix.UnixChannel;
45  import io.netty.channel.unix.UnixChannelUtil;
46  import io.netty.util.ReferenceCountUtil;
47  import io.netty.util.concurrent.PromiseNotifier;
48  import io.netty.util.internal.logging.InternalLogger;
49  import io.netty.util.internal.logging.InternalLoggerFactory;
50  
51  import java.io.IOException;
52  import java.net.InetSocketAddress;
53  import java.net.SocketAddress;
54  import java.nio.ByteBuffer;
55  import java.nio.channels.AlreadyConnectedException;
56  import java.nio.channels.ClosedChannelException;
57  import java.nio.channels.ConnectionPendingException;
58  import java.nio.channels.NotYetConnectedException;
59  import java.nio.channels.UnresolvedAddressException;
60  import java.util.concurrent.ScheduledFuture;
61  import java.util.concurrent.TimeUnit;
62  
63  import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
64  import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
65  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
66  import static io.netty.util.internal.ObjectUtil.checkNotNull;
67  
68  
69  abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
70      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
71      final LinuxSocket socket;
72      protected volatile boolean active;
73  
74      // Different masks for outstanding I/O operations.
75      private static final int POLL_IN_SCHEDULED = 1;
76      private static final int POLL_OUT_SCHEDULED = 1 << 2;
77      private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
78      private static final int WRITE_SCHEDULED = 1 << 4;
79      private static final int READ_SCHEDULED = 1 << 5;
80      private static final int CONNECT_SCHEDULED = 1 << 6;
81  
82      private short opsId = Short.MIN_VALUE;
83  
84      private long pollInId;
85      private long pollOutId;
86      private long pollRdhupId;
87      private long connectId;
88  
89      // A byte is enough for now.
90      private byte ioState;
91  
92      // It's possible that multiple read / writes are issued. We need to keep track of these.
93      // Let's limit the amount of pending writes and reads by Short.MAX_VALUE. Maybe Byte.MAX_VALUE would also be good
94      // enough but let's be a bit more flexible for now.
95      private short numOutstandingWrites;
96      // A value of -1 means that multi-shot is used and so reads will be issued as long as the request is not canceled.
97      private short numOutstandingReads;
98  
99      private boolean readPending;
100     private boolean inReadComplete;
101     private boolean socketHasMoreData;
102 
103     private static final class DelayedClose {
104         private final ChannelPromise promise;
105         private final Throwable cause;
106         private final ClosedChannelException closeCause;
107 
108         DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
109             this.promise = promise;
110             this.cause = cause;
111             this.closeCause = closeCause;
112         }
113     }
114     private DelayedClose delayedClose;
115     private boolean inputClosedSeenErrorOnRead;
116 
117     /**
118      * The future of the current connection attempt.  If not null, subsequent connection attempts will fail.
119      */
120     private ChannelPromise connectPromise;
121     private ScheduledFuture<?> connectTimeoutFuture;
122     private SocketAddress requestedRemoteAddress;
123     private ByteBuffer remoteAddressMemory;
124     private MsgHdrMemoryArray msgHdrMemoryArray;
125 
126     private IoRegistration registration;
127 
128     private volatile SocketAddress local;
129     private volatile SocketAddress remote;
130 
131     AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
132         super(parent);
133         this.socket = checkNotNull(socket, "fd");
134 
135         if (active) {
136             // Directly cache the remote and local addresses
137             // See https://github.com/netty/netty/issues/2359
138             this.active = true;
139             this.local = socket.localAddress();
140             this.remote = socket.remoteAddress();
141         }
142 
143         logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
144     }
145 
146     AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
147         super(parent);
148         this.socket = checkNotNull(fd, "fd");
149         this.active = true;
150 
151         // Directly cache the remote and local addresses
152         // See https://github.com/netty/netty/issues/2359
153         this.remote = remote;
154         this.local = fd.localAddress();
155     }
156 
157     // Called once a Channel changed from AUTO_READ=true to AUTO_READ=false
158     final void autoReadCleared() {
159         if (!isRegistered()) {
160             return;
161         }
162         readPending = false;
163         IoRegistration registration = this.registration;
164         if (registration == null || !registration.isValid()) {
165             return;
166         }
167         if (eventLoop().inEventLoop()) {
168             clearRead();
169         } else {
170             eventLoop().execute(this::clearRead);
171         }
172     }
173 
174     private void clearRead() {
175         assert eventLoop().inEventLoop();
176         readPending = false;
177         IoRegistration registration = this.registration;
178         if (registration == null || !registration.isValid()) {
179             return;
180         }
181         if ((ioState & POLL_IN_SCHEDULED) != 0) {
182             // There was a POLLIN scheduled, let's cancel it so we are not notified of any more reads for now.
183             assert pollInId != 0;
184             long id = registration.submit(
185                     IoUringIoOps.newAsyncCancel((byte) 0, pollInId, Native.IORING_OP_POLL_ADD));
186             assert id != 0;
187             ioState &= ~POLL_IN_SCHEDULED;
188         }
189         // Also cancel all outstanding reads as the user did signal there is no more desire to read.
190         cancelOutstandingReads(registration(), numOutstandingReads);
191     }
192 
193     /**
194      * Returns the next id that should be used when submitting {@link IoUringIoOps}.
195      *
196      * @return  opsId
197      */
198     protected final short nextOpsId() {
199         short id = opsId++;
200 
201         // We use 0 for "none".
202         if (id == 0) {
203             id = opsId++;
204         }
205         return id;
206     }
207 
208     public final boolean isOpen() {
209         return socket.isOpen();
210     }
211 
212     @Override
213     public boolean isActive() {
214         return active;
215     }
216 
217     @Override
218     public final FileDescriptor fd() {
219         return socket;
220     }
221 
222     private AbstractUringUnsafe ioUringUnsafe() {
223         return (AbstractUringUnsafe) unsafe();
224     }
225 
226     @Override
227     protected boolean isCompatible(final EventLoop loop) {
228         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
229     }
230 
231     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
232         return newDirectBuffer(buf, buf);
233     }
234 
235     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
236         final int readableBytes = buf.readableBytes();
237         if (readableBytes == 0) {
238             ReferenceCountUtil.release(holder);
239             return Unpooled.EMPTY_BUFFER;
240         }
241 
242         final ByteBufAllocator alloc = alloc();
243         if (alloc.isDirectBufferPooled()) {
244             return newDirectBuffer0(holder, buf, alloc, readableBytes);
245         }
246 
247         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
248         if (directBuf == null) {
249             return newDirectBuffer0(holder, buf, alloc, readableBytes);
250         }
251 
252         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
253         ReferenceCountUtil.safeRelease(holder);
254         return directBuf;
255     }
256 
257     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
258         final ByteBuf directBuf = alloc.directBuffer(capacity);
259         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
260         ReferenceCountUtil.safeRelease(holder);
261         return directBuf;
262     }
263 
264     /**
265      * Cancel all outstanding reads
266      *
267      * @param registration          the {@link IoRegistration}.
268      * @param numOutstandingReads   the number of outstanding reads, or {@code -1} if multi-shot was used.
269      */
270     protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
271 
272     /**
273      * Cancel all outstanding writes
274      *
275      * @param registration          the {@link IoRegistration}.
276      * @param numOutstandingWrites  the number of outstanding writes.
277      */
278     protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
279 
280     @Override
281     protected void doDisconnect() throws Exception {
282     }
283 
284     private void freeRemoteAddressMemory() {
285         if (remoteAddressMemory != null) {
286             Buffer.free(remoteAddressMemory);
287             remoteAddressMemory = null;
288         }
289     }
290 
291     private void freeMsgHdrArray() {
292         if (msgHdrMemoryArray != null) {
293             msgHdrMemoryArray.release();
294             msgHdrMemoryArray = null;
295         }
296     }
297 
298     @Override
299     protected void doClose() throws Exception {
300         active = false;
301 
302         if (registration != null) {
303             if (socket.markClosed()) {
304                 int fd = fd().intValue();
305                 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
306                 registration.submit(ops);
307             }
308         } else {
309             // This one was never registered just use a syscall to close.
310             socket.close();
311             ioUringUnsafe().freeResourcesNowIfNeeded(null);
312         }
313     }
314 
315     @Override
316     protected final void doBeginRead() {
317         if (inputClosedSeenErrorOnRead) {
318             // We did see an error while reading and so closed the input. Stop reading.
319             return;
320         }
321         if (readPending) {
322             // We already have a read pending.
323             return;
324         }
325         readPending = true;
326         if (inReadComplete || !isActive()) {
327             // We are currently in the readComplete(...) callback which might issue more reads by itself.
328             // If readComplete(...) will not issue more reads itself it will pick up the readPending flag, reset it and
329             // call doBeginReadNow().
330             return;
331         }
332         doBeginReadNow();
333     }
334 
335     private void doBeginReadNow() {
336         if (inputClosedSeenErrorOnRead) {
337             // We did see an error while reading and so closed the input.
338             return;
339         }
340         if (!isPollInFirst() ||
341                 // If the socket was not empty, and we stopped reading we need to ensure we just force the
342                 // read as POLLIN might be edge-triggered (in case of POLL_ADD_MULTI).
343                 socketHasMoreData) {
344             // If the socket is blocking we will directly call scheduleFirstReadIfNeeded() as we can use FASTPOLL.
345             ioUringUnsafe().scheduleFirstReadIfNeeded();
346         } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
347             ioUringUnsafe().schedulePollIn();
348         }
349     }
350 
351     @Override
352     protected void doWrite(ChannelOutboundBuffer in) {
353         scheduleWriteIfNeeded(in, true);
354     }
355 
356     protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
357         if ((ioState & WRITE_SCHEDULED) != 0) {
358             return;
359         }
360         if (scheduleWrite(in) > 0) {
361             ioState |= WRITE_SCHEDULED;
362             if (submitAndRunNow && !isWritable()) {
363                 submitAndRunNow();
364             }
365         }
366     }
367 
368     protected void submitAndRunNow() {
369         // NOOP
370     }
371 
372     private int scheduleWrite(ChannelOutboundBuffer in) {
373         if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
374             return 0;
375         }
376         if (in == null) {
377             return 0;
378         }
379 
380         int msgCount = in.size();
381         if (msgCount == 0) {
382             return 0;
383         }
384         Object msg = in.current();
385 
386         if (msgCount > 1 && in.current() instanceof ByteBuf) {
387             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
388         } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
389                     (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
390             // We also need some special handling for CompositeByteBuf
391             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
392         } else {
393             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
394         }
395         // Ensure we never overflow
396         assert numOutstandingWrites > 0;
397         return numOutstandingWrites;
398     }
399 
400     protected final IoRegistration registration() {
401         assert registration != null;
402         return registration;
403     }
404 
405     private void schedulePollOut() {
406         pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
407     }
408 
409     final void schedulePollRdHup() {
410         pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
411     }
412 
413     private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
414         assert (ioState & ioMask) == 0;
415         int fd = fd().intValue();
416         IoRegistration registration = registration();
417         IoUringIoOps ops = IoUringIoOps.newPollAdd(
418                 fd, (byte) 0, mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
419         long id = registration.submit(ops);
420         if (id != 0) {
421             ioState |= (byte) ioMask;
422         }
423         return id;
424     }
425 
426     final void resetCachedAddresses() {
427         local = socket.localAddress();
428         remote = socket.remoteAddress();
429     }
430 
431     abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
432         private IoUringRecvByteAllocatorHandle allocHandle;
433         private boolean closed;
434         private boolean freed;
435         private boolean socketIsEmpty;
436 
437         /**
438          * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer} and returns the number of
439          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
440          */
441         protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
442 
443         /**
444          * Schedule the write of a single message and returns the number of
445          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
446          */
447         protected abstract int scheduleWriteSingle(Object msg);
448 
449         @Override
450         public final void handle(IoRegistration registration, IoEvent ioEvent) {
451             IoUringIoEvent event = (IoUringIoEvent) ioEvent;
452             byte op = event.opcode();
453             int res = event.res();
454             int flags = event.flags();
455             short data = event.data();
456             switch (op) {
457                 case Native.IORING_OP_RECV:
458                 case Native.IORING_OP_ACCEPT:
459                 case Native.IORING_OP_RECVMSG:
460                 case Native.IORING_OP_READ:
461                     readComplete(op, res, flags, data);
462                     break;
463                 case Native.IORING_OP_WRITEV:
464                 case Native.IORING_OP_SEND:
465                 case Native.IORING_OP_SENDMSG:
466                 case Native.IORING_OP_WRITE:
467                 case Native.IORING_OP_SPLICE:
468                     writeComplete(op, res, flags, data);
469                     break;
470                 case Native.IORING_OP_POLL_ADD:
471                     pollAddComplete(res, flags, data);
472                     break;
473                 case Native.IORING_OP_ASYNC_CANCEL:
474                     cancelComplete0(op, res, flags, data);
475                     break;
476                 case Native.IORING_OP_CONNECT:
477                     connectComplete(op, res, flags, data);
478 
479                     // once the connect was completed we can also free some resources that are not needed anymore.
480                     freeMsgHdrArray();
481                     freeRemoteAddressMemory();
482                     break;
483                 case Native.IORING_OP_CLOSE:
484                     if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
485                         if (delayedClose != null) {
486                             delayedClose.promise.setSuccess();
487                         }
488                         closed = true;
489                     }
490                     break;
491                 default:
492                     break;
493             }
494 
495             // We delay the actual close if there is still a write or read scheduled, let's see if there
496             // was a close that needs to be done now.
497             handleDelayedClosed();
498 
499             if (ioState == 0 && closed) {
500                 freeResourcesNowIfNeeded(registration);
501             }
502         }
503 
504         private void freeResourcesNowIfNeeded(IoRegistration reg) {
505             if (!freed) {
506                 freed = true;
507                 freeResourcesNow(reg);
508             }
509         }
510 
511         /**
512          * Free all resources now. No new IO will be submitted for this channel via io_uring
513          *
514          * @param reg   the {@link IoRegistration} or {@code null} if it was never registered
515          */
516         protected void freeResourcesNow(IoRegistration reg) {
517             freeMsgHdrArray();
518             freeRemoteAddressMemory();
519             if (reg != null) {
520                 reg.cancel();
521             }
522         }
523 
524         private void handleDelayedClosed() {
525             if (delayedClose != null && canCloseNow()) {
526                 closeNow();
527             }
528         }
529 
530         private void pollAddComplete(int res, int flags, short data) {
531             if ((res & Native.POLLOUT) != 0) {
532                 pollOut(res);
533             }
534             if ((res & Native.POLLIN) != 0) {
535                 pollIn(res, flags, data);
536             }
537             if ((res & Native.POLLRDHUP) != 0) {
538                 pollRdHup(res);
539             }
540         }
541 
542         @Override
543         public final void close() throws Exception {
544             close(voidPromise());
545         }
546 
547         @Override
548         protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
549             if (closeFuture().isDone()) {
550                 // Closed already before.
551                 safeSetSuccess(promise);
552                 return;
553             }
554             if (delayedClose == null) {
555                 // We have a write operation pending that should be completed asap.
556                 // We will do the actual close operation one this write result is returned as otherwise
557                 // we may get into trouble as we may close the fd while we did not process the write yet.
558                 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
559             } else {
560                 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
561                 return;
562             }
563 
564             boolean cancelConnect = false;
565             try {
566                 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
567                 if (connectPromise != null) {
568                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
569                     connectPromise.tryFailure(new ClosedChannelException());
570                     AbstractIoUringChannel.this.connectPromise = null;
571                     cancelConnect = true;
572                 }
573 
574                 cancelConnectTimeoutFuture();
575             } finally {
576                 // It's important we cancel all outstanding connect, write and read operations now so
577                 // we will be able to process a delayed close if needed.
578                 cancelOps(cancelConnect);
579             }
580 
581             if (canCloseNow()) {
582                 // Currently there are is no WRITE and READ scheduled so we can start to teardown the channel.
583                 closeNow();
584             }
585         }
586 
587         private void cancelOps(boolean cancelConnect) {
588             if (registration == null || !registration.isValid()) {
589                 return;
590             }
591             byte flags = (byte) 0;
592             if ((ioState & POLL_RDHUP_SCHEDULED) != 0 && pollRdhupId != 0) {
593                 long id = registration.submit(
594                         IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
595                 assert id != 0;
596                 pollRdhupId = 0;
597             }
598             if ((ioState & POLL_IN_SCHEDULED) != 0 && pollInId != 0) {
599                 long id = registration.submit(
600                         IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
601                 assert id != 0;
602                 pollInId = 0;
603             }
604             if ((ioState & POLL_OUT_SCHEDULED) != 0 && pollOutId != 0) {
605                 long id = registration.submit(
606                         IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
607                 assert id != 0;
608                 pollOutId = 0;
609             }
610             if (cancelConnect && connectId != 0) {
611                 // Best effort to cancel the already submitted connect request.
612                 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
613                 assert id != 0;
614                 connectId = 0;
615             }
616             cancelOutstandingReads(registration, numOutstandingReads);
617             cancelOutstandingWrites(registration, numOutstandingWrites);
618         }
619 
620         private boolean canCloseNow() {
621             // Currently there are is no WRITE and READ scheduled, we can close the channel now without
622             // problems related to re-ordering of completions.
623             return (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
624         }
625 
626         private void closeNow() {
627             super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
628         }
629 
630         @Override
631         protected final void flush0() {
632             // Flush immediately only when there's no pending flush.
633             // If there's a pending flush operation, event loop will call forceFlush() later,
634             // and thus there's no need to call it now.
635             if ((ioState & POLL_OUT_SCHEDULED) == 0) {
636                 super.flush0();
637             }
638         }
639 
640         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
641             if (promise == null) {
642                 // Closed via cancellation and the promise has been notified already.
643                 return;
644             }
645 
646             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
647             promise.tryFailure(cause);
648             closeIfClosed();
649         }
650 
651         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
652             if (promise == null) {
653                 // Closed via cancellation and the promise has been notified already.
654                 return;
655             }
656             active = true;
657 
658             if (local == null) {
659                 local = socket.localAddress();
660             }
661             computeRemote();
662 
663             // Register POLLRDHUP
664             schedulePollRdHup();
665 
666             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
667             // We still need to ensure we call fireChannelActive() in this case.
668             boolean active = isActive();
669 
670             // trySuccess() will return false if a user cancelled the connection attempt.
671             boolean promiseSet = promise.trySuccess();
672 
673             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
674             // because what happened is what happened.
675             if (!wasActive && active) {
676                 pipeline().fireChannelActive();
677             }
678 
679             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
680             if (!promiseSet) {
681                 close(voidPromise());
682             }
683         }
684 
685         @Override
686         public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
687             if (allocHandle == null) {
688                 allocHandle = new IoUringRecvByteAllocatorHandle(
689                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
690             }
691             return allocHandle;
692         }
693 
694         final void shutdownInput(boolean allDataRead) {
695             logger.trace("shutdownInput Fd: {}", fd().intValue());
696             if (!socket.isInputShutdown()) {
697                 if (isAllowHalfClosure(config())) {
698                     try {
699                         socket.shutdown(true, false);
700                     } catch (IOException ignored) {
701                         // We attempted to shutdown and failed, which means the input has already effectively been
702                         // shutdown.
703                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
704                         return;
705                     } catch (NotYetConnectedException ignore) {
706                         // We attempted to shutdown and failed, which means the input has already effectively been
707                         // shutdown.
708                     }
709                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
710                 } else {
711                     // Handle this same way as if we did read all data so we don't schedule another read.
712                     inputClosedSeenErrorOnRead = true;
713                     close(voidPromise());
714                     return;
715                 }
716             }
717             if (allDataRead && !inputClosedSeenErrorOnRead) {
718                 inputClosedSeenErrorOnRead = true;
719                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
720             }
721         }
722 
723         private void fireEventAndClose(Object evt) {
724             pipeline().fireUserEventTriggered(evt);
725             close(voidPromise());
726         }
727 
728         final void schedulePollIn() {
729             assert (ioState & POLL_IN_SCHEDULED) == 0;
730             if (!isActive() || shouldBreakIoUringInReady(config())) {
731                 return;
732             }
733             pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, IoUring.isPollAddMultishotEnabled());
734         }
735 
736         private void readComplete(byte op, int res, int flags, short data) {
737             assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
738 
739             boolean multishot = numOutstandingReads == -1;
740             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
741             if (rearm) {
742                 // Reset READ_SCHEDULED if there is nothing more to handle and so we need to re-arm. This works for
743                 // multi-shot and non multi-shot variants.
744                 ioState &= ~READ_SCHEDULED;
745             }
746             boolean pending = readPending;
747             if (multishot) {
748                 // Reset readPending so we can still keep track if we might need to cancel the multi-shot read or
749                 // not.
750                 readPending = false;
751             } else if (--numOutstandingReads == 0) {
752                 // We received all outstanding completions.
753                 readPending = false;
754                 ioState &= ~READ_SCHEDULED;
755             }
756             inReadComplete = true;
757             try {
758                 socketIsEmpty = socketIsEmpty(flags);
759                 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
760                         (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
761                 readComplete0(op, res, flags, data, numOutstandingReads);
762             } finally {
763                 try {
764                     // Check if we should consider the read loop to be done.
765                     if (recvBufAllocHandle().isReadComplete()) {
766                         // Reset the handle as we are done with the read-loop.
767                         recvBufAllocHandle().reset(config());
768 
769                         // Check if this was a readComplete(...) triggered by a read or multi-shot read.
770                         if (!multishot) {
771                             if (readPending) {
772                                 // This was a "normal" read and the user did signal we should continue reading.
773                                 // Let's schedule the read now.
774                                 doBeginReadNow();
775                             }
776                         } else {
777                             // The readComplete(...) was triggered by a multi-shot read. Because of this the state
778                             // machine is a bit more complicated.
779 
780                             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
781                                 // The readComplete(...) was triggered because the previous read was cancelled.
782                                 // In this case we we need to check if the user did signal the desire to read again
783                                 // in the meantime. If this is the case we need to schedule the read to ensure
784                                 // we do not stall.
785                                 if (pending) {
786                                     doBeginReadNow();
787                                 }
788                             } else if (rearm) {
789                                 // We need to rearm the multishot as otherwise we might miss some data.
790                                 doBeginReadNow();
791                             } else if (!readPending) {
792                                 // Cancel the multi-shot read now as the user did not signal that we want to keep
793                                 // reading while we handle the completion event.
794                                 cancelOutstandingReads(registration, numOutstandingReads);
795                             }
796                         }
797                     } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
798                         // The readComplete(...) was triggered because the previous read was cancelled.
799                         // In this case we we need to check if the user did signal the desire to read again
800                         // in the meantime. If this is the case we need to schedule the read to ensure
801                         // we do not stall.
802                         if (pending) {
803                             doBeginReadNow();
804                         }
805                     } else if (multishot && rearm) {
806                         // We need to rearm the multishot as otherwise we might miss some data.
807                         doBeginReadNow();
808                     }
809                 } finally {
810                     inReadComplete = false;
811                     socketIsEmpty = false;
812                 }
813             }
814         }
815 
816         /**
817          * Called once a read was completed.
818          */
819         protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
820 
821         /**
822          * Called once POLLRDHUP event is ready to be processed
823          */
824         private void pollRdHup(int res) {
825             ioState &= ~POLL_RDHUP_SCHEDULED;
826             pollRdhupId = 0;
827             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
828                 return;
829             }
830 
831             // Mark that we received a POLLRDHUP and so need to continue reading until all the input ist drained.
832             recvBufAllocHandle().rdHupReceived();
833 
834             if (isActive()) {
835                 scheduleFirstReadIfNeeded();
836             } else {
837                 // Just to be safe make sure the input marked as closed.
838                 shutdownInput(false);
839             }
840         }
841 
842         /**
843          * Called once POLLIN event is ready to be processed
844          */
845         private void pollIn(int res, int flags, short data) {
846             // Check if we need to rearm. This works for both cases, POLL_ADD and POLL_ADD_MULTI.
847             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
848             if (rearm) {
849                 ioState &= ~POLL_IN_SCHEDULED;
850                 pollInId = 0;
851             }
852             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
853                 return;
854             }
855             if (!readPending) {
856                 // We received the POLLIN but the user is not interested yet in reading, just mark socketHasMoreData
857                 // as true so we will trigger a read directly once the user calls read()
858                 socketHasMoreData = true;
859                 return;
860             }
861             scheduleFirstReadIfNeeded();
862         }
863 
864         private void scheduleFirstReadIfNeeded() {
865             if ((ioState & READ_SCHEDULED) == 0) {
866                 scheduleFirstRead();
867             }
868         }
869 
870         private void scheduleFirstRead() {
871             // This is a new "read loop" so we need to reset the allocHandle.
872             final ChannelConfig config = config();
873             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
874             allocHandle.reset(config);
875             scheduleRead(true);
876         }
877 
878         protected final void scheduleRead(boolean first) {
879             // Only schedule another read if the fd is still open.
880             if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
881                 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
882                 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
883                     ioState |= READ_SCHEDULED;
884                 }
885             }
886         }
887 
888         /**
889          * Schedule a read and returns the number of {@link #readComplete(byte, int, int, short)}
890          * calls that are expected because of the scheduled read.
891          *
892          * @param first             {@code true} if this is the first read of a read loop.
893          * @param socketIsEmpty     {@code true} if the socket is guaranteed to be empty, {@code false} otherwise.
894          * @return                  the number of {@link #readComplete(byte, int, int, short)} calls expected or
895          *                          {@code -1} if {@link #readComplete(byte, int, int, short)} is called until
896          *                          the read is cancelled (multi-shot).
897          */
898         protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
899 
900         /**
901          * Called once POLLOUT event is ready to be processed
902          *
903          * @param res   the result.
904          */
905         private void pollOut(int res) {
906             ioState &= ~POLL_OUT_SCHEDULED;
907             pollOutId = 0;
908             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
909                 return;
910             }
911             // pending connect
912             if (connectPromise != null) {
913                 // Note this method is invoked by the event loop only if the connection attempt was
914                 // neither cancelled nor timed out.
915 
916                 assert eventLoop().inEventLoop();
917 
918                 boolean connectStillInProgress = false;
919                 try {
920                     boolean wasActive = isActive();
921                     if (!socket.finishConnect()) {
922                         connectStillInProgress = true;
923                         return;
924                     }
925                     fulfillConnectPromise(connectPromise, wasActive);
926                 } catch (Throwable t) {
927                     fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
928                 } finally {
929                     if (!connectStillInProgress) {
930                         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0
931                         // is used
932                         // See https://github.com/netty/netty/issues/1770
933                         cancelConnectTimeoutFuture();
934                         connectPromise = null;
935                     } else {
936                         // The connect was not done yet, register for POLLOUT again
937                         schedulePollOut();
938                     }
939                 }
940             } else if (!socket.isOutputShutdown()) {
941                 // Try writing again
942                 super.flush0();
943             }
944         }
945 
946         /**
947          * Called once a write was completed.
948          *
949          * @param op    the op code.
950          * @param res   the result.
951          * @param flags the flags.
952          * @param data  the data that was passed when submitting the op.
953          */
954         private void writeComplete(byte op, int res, int flags, short data) {
955             if ((ioState & CONNECT_SCHEDULED) != 0) {
956                 // The writeComplete(...) callback was called because of a sendmsg(...) result that was used for
957                 // TCP_FASTOPEN_CONNECT.
958                 freeMsgHdrArray();
959                 if (res > 0) {
960                     // Connect complete!
961                     outboundBuffer().removeBytes(res);
962 
963                     // Explicit pass in 0 as this is returned by a connect(...) call when it was successful.
964                     connectComplete(op, 0, flags, data);
965                 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
966                     // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
967                     // In this case, our TCP connection will be established normally, but no data was transmitted at
968                     // this time. We'll just transmit the data with normal writes later.
969                     // Let's submit a normal connect.
970                     submitConnect((InetSocketAddress) requestedRemoteAddress);
971                 } else {
972                     // There was an error, handle it as a normal connect error.
973                     connectComplete(op, res, flags, data);
974                 }
975                 return;
976             }
977             assert numOutstandingWrites > 0;
978             --numOutstandingWrites;
979 
980             boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
981             if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
982 
983                 // We were not able to write everything, let's register for POLLOUT
984                 schedulePollOut();
985             }
986 
987             // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
988             // while still removing messages internally in removeBytes(...) which then may corrupt state.
989             if (numOutstandingWrites == 0) {
990                 ioState &= ~WRITE_SCHEDULED;
991 
992                 // If we could write all and we did not schedule a pollout yet let us try to write again
993                 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
994                     scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
995                 }
996             }
997         }
998 
999         /**
1000          * Called once a write was completed.
1001          * @param op            the op code
1002          * @param res           the result.
1003          * @param flags         the flags.
1004          * @param data          the data that was passed when submitting the op.
1005          * @param outstanding   the outstanding write completions.
1006          */
1007         abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1008 
1009         /**
1010          * Called once a cancel was completed.
1011          *
1012          * @param op            the op code
1013          * @param res           the result.
1014          * @param flags         the flags.
1015          * @param data          the data that was passed when submitting the op.
1016          */
1017         void cancelComplete0(byte op, int res, int flags, short data) {
1018             // NOOP
1019         }
1020 
1021         /**
1022          * Called once a connect was completed.
1023          * @param op            the op code.
1024          * @param res           the result.
1025          * @param flags         the flags.
1026          * @param data          the data that was passed when submitting the op.
1027          */
1028         void connectComplete(byte op, int res, int flags, short data) {
1029             ioState &= ~CONNECT_SCHEDULED;
1030             freeRemoteAddressMemory();
1031 
1032             if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1033                 // connect not complete yet need to wait for poll_out event
1034                 schedulePollOut();
1035             } else {
1036                 try {
1037                     if (res == 0) {
1038                         fulfillConnectPromise(connectPromise, active);
1039                         if (readPending) {
1040                             doBeginReadNow();
1041                         }
1042                     } else {
1043                         try {
1044                             Errors.throwConnectException("io_uring connect", res);
1045                         } catch (Throwable cause) {
1046                             fulfillConnectPromise(connectPromise, cause);
1047                         }
1048                     }
1049                 } finally {
1050                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is
1051                     // used
1052                     // See https://github.com/netty/netty/issues/1770
1053                     cancelConnectTimeoutFuture();
1054                     connectPromise = null;
1055                 }
1056             }
1057         }
1058 
1059         @Override
1060         public void connect(
1061                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1062             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
1063             // non-blocking io.
1064             if (promise.isDone() || !ensureOpen(promise)) {
1065                 return;
1066             }
1067 
1068             if (delayedClose != null) {
1069                 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1070                 return;
1071             }
1072             try {
1073                 if (connectPromise != null) {
1074                     throw new ConnectionPendingException();
1075                 }
1076                 if (localAddress instanceof InetSocketAddress) {
1077                     checkResolvable((InetSocketAddress) localAddress);
1078                 }
1079 
1080                 if (remoteAddress instanceof InetSocketAddress) {
1081                     checkResolvable((InetSocketAddress) remoteAddress);
1082                 }
1083 
1084                 if (remote != null) {
1085                     // Check if already connected before trying to connect. This is needed as connect(...) will not#
1086                     // return -1 and set errno to EISCONN if a previous connect(...) attempt was setting errno to
1087                     // EINPROGRESS and finished later.
1088                     throw new AlreadyConnectedException();
1089                 }
1090 
1091                 if (localAddress != null) {
1092                     socket.bind(localAddress);
1093                 }
1094 
1095                 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1096 
1097                 ByteBuf initialData = null;
1098                 if (IoUring.isTcpFastOpenClientSideAvailable() &&
1099                         config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1100                     ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1101                     outbound.addFlush();
1102                     Object curr;
1103                     if ((curr = outbound.current()) instanceof ByteBuf) {
1104                         initialData = (ByteBuf) curr;
1105                     }
1106                 }
1107                 if (initialData != null) {
1108                     msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1109                     MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1110                     hdr.set(socket, inetSocketAddress, IoUring.memoryAddress(initialData),
1111                             initialData.readableBytes(), (short) 0);
1112 
1113                     int fd = fd().intValue();
1114                     IoRegistration registration = registration();
1115                     IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
1116                             hdr.address(), hdr.idx());
1117                     connectId = registration.submit(ops);
1118                     if (connectId == 0) {
1119                         // Directly release the memory if submitting failed.
1120                         freeMsgHdrArray();
1121                     }
1122                 } else {
1123                     submitConnect(inetSocketAddress);
1124                 }
1125                 if (connectId != 0) {
1126                     ioState |= CONNECT_SCHEDULED;
1127                 }
1128             } catch (Throwable t) {
1129                 closeIfClosed();
1130                 promise.tryFailure(annotateConnectException(t, remoteAddress));
1131                 return;
1132             }
1133             connectPromise = promise;
1134             requestedRemoteAddress = remoteAddress;
1135             // Schedule connect timeout.
1136             int connectTimeoutMillis = config().getConnectTimeoutMillis();
1137             if (connectTimeoutMillis > 0) {
1138                 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1139                     @Override
1140                     public void run() {
1141                         ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1142                         if (connectPromise != null && !connectPromise.isDone() &&
1143                                 connectPromise.tryFailure(new ConnectTimeoutException(
1144                                         "connection timed out: " + remoteAddress))) {
1145                             close(voidPromise());
1146                         }
1147                     }
1148                 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1149             }
1150 
1151             promise.addListener(new ChannelFutureListener() {
1152                 @Override
1153                 public void operationComplete(ChannelFuture future) {
1154                     // If the connect future is cancelled we also cancel the timeout and close the
1155                     // underlying socket.
1156                     if (future.isCancelled()) {
1157                         cancelConnectTimeoutFuture();
1158                         connectPromise = null;
1159                         close(voidPromise());
1160                     }
1161                 }
1162             });
1163         }
1164     }
1165 
1166     private void submitConnect(InetSocketAddress inetSocketAddress) {
1167         remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1168 
1169         SockaddrIn.set(socket.isIpv6(), remoteAddressMemory, inetSocketAddress);
1170 
1171         int fd = fd().intValue();
1172         IoRegistration registration = registration();
1173         IoUringIoOps ops = IoUringIoOps.newConnect(
1174                 fd, (byte) 0, Buffer.memoryAddress(remoteAddressMemory), nextOpsId());
1175         connectId = registration.submit(ops);
1176         if (connectId == 0) {
1177             // Directly release the memory if submitting failed.
1178             freeRemoteAddressMemory();
1179         }
1180     }
1181 
1182     @Override
1183     protected Object filterOutboundMessage(Object msg) {
1184         if (msg instanceof ByteBuf) {
1185             ByteBuf buf = (ByteBuf) msg;
1186             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1187         }
1188         throw new UnsupportedOperationException("unsupported message type");
1189     }
1190 
1191     @Override
1192     protected void doRegister(ChannelPromise promise) {
1193         IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1194         eventLoop.register(ioUringUnsafe()).addListener(f -> {
1195             if (f.isSuccess()) {
1196                 registration = (IoRegistration) f.getNow();
1197                 promise.setSuccess();
1198             } else {
1199                 promise.setFailure(f.cause());
1200             }
1201         });
1202     }
1203 
1204     @Override
1205     protected final void doDeregister() {
1206         // Cancel all previous submitted ops.
1207         ioUringUnsafe().cancelOps(connectPromise != null);
1208     }
1209 
1210     @Override
1211     protected void doBind(final SocketAddress local) throws Exception {
1212         if (local instanceof InetSocketAddress) {
1213             checkResolvable((InetSocketAddress) local);
1214         }
1215         socket.bind(local);
1216         this.local = socket.localAddress();
1217     }
1218 
1219     protected static void checkResolvable(InetSocketAddress addr) {
1220         if (addr.isUnresolved()) {
1221             throw new UnresolvedAddressException();
1222         }
1223     }
1224 
1225     @Override
1226     protected final SocketAddress localAddress0() {
1227         return local;
1228     }
1229 
1230     @Override
1231     protected final SocketAddress remoteAddress0() {
1232         return remote;
1233     }
1234 
1235     private static boolean isAllowHalfClosure(ChannelConfig config) {
1236         return config instanceof SocketChannelConfig &&
1237                ((SocketChannelConfig) config).isAllowHalfClosure();
1238     }
1239 
1240     private void cancelConnectTimeoutFuture() {
1241         if (connectTimeoutFuture != null) {
1242             connectTimeoutFuture.cancel(false);
1243             connectTimeoutFuture = null;
1244         }
1245     }
1246 
1247     private void computeRemote() {
1248         if (requestedRemoteAddress instanceof InetSocketAddress) {
1249             remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1250         }
1251     }
1252 
1253     private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1254         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1255     }
1256 
1257     /**
1258      * Return if the socket is guaranteed to be empty when the submitted io was executed and the completion event be
1259      * created.
1260      * @param flags     the flags that were part of the completion
1261      * @return          {@code true} if empty.
1262      */
1263     protected abstract boolean socketIsEmpty(int flags);
1264 
1265     abstract boolean isPollInFirst();
1266 }