View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufHolder;
21  import io.netty.buffer.ByteBufUtil;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractChannel;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelOption;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.ServerChannel;
38  import io.netty.channel.socket.ChannelInputShutdownEvent;
39  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40  import io.netty.channel.socket.SocketChannelConfig;
41  import io.netty.channel.unix.Buffer;
42  import io.netty.channel.unix.DomainSocketAddress;
43  import io.netty.channel.unix.Errors;
44  import io.netty.channel.unix.FileDescriptor;
45  import io.netty.channel.unix.UnixChannel;
46  import io.netty.channel.unix.UnixChannelUtil;
47  import io.netty.util.ReferenceCountUtil;
48  import io.netty.util.concurrent.PromiseNotifier;
49  import io.netty.util.internal.CleanableDirectBuffer;
50  import io.netty.util.internal.StringUtil;
51  import io.netty.util.internal.logging.InternalLogger;
52  import io.netty.util.internal.logging.InternalLoggerFactory;
53  
54  import java.io.IOException;
55  import java.net.InetSocketAddress;
56  import java.net.SocketAddress;
57  import java.nio.ByteBuffer;
58  import java.nio.channels.AlreadyConnectedException;
59  import java.nio.channels.ClosedChannelException;
60  import java.nio.channels.ConnectionPendingException;
61  import java.nio.channels.NotYetConnectedException;
62  import java.nio.channels.UnresolvedAddressException;
63  import java.util.concurrent.ScheduledFuture;
64  import java.util.concurrent.TimeUnit;
65  
66  import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
67  import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
68  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
69  import static io.netty.util.internal.ObjectUtil.checkNotNull;
70  import static io.netty.util.internal.StringUtil.className;
71  
72  
73  abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
74      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
75      final LinuxSocket socket;
76      protected volatile boolean active;
77  
78      // Different masks for outstanding I/O operations.
79      private static final int POLL_IN_SCHEDULED = 1;
80      private static final int POLL_OUT_SCHEDULED = 1 << 2;
81      private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
82      private static final int WRITE_SCHEDULED = 1 << 4;
83      private static final int READ_SCHEDULED = 1 << 5;
84      private static final int CONNECT_SCHEDULED = 1 << 6;
85  
86      private short opsId = Short.MIN_VALUE;
87  
88      private long pollInId;
89      private long pollOutId;
90      private long pollRdhupId;
91      private long connectId;
92  
93      // A byte is enough for now.
94      private byte ioState;
95  
96      // It's possible that multiple read / writes are issued. We need to keep track of these.
97      // Let's limit the amount of pending writes and reads by Short.MAX_VALUE. Maybe Byte.MAX_VALUE would also be good
98      // enough but let's be a bit more flexible for now.
99      private short numOutstandingWrites;
100     // A value of -1 means that multi-shot is used and so reads will be issued as long as the request is not canceled.
101     private short numOutstandingReads;
102 
103     private boolean readPending;
104     private boolean inReadComplete;
105     private boolean socketHasMoreData;
106 
107     private static final class DelayedClose {
108         private final ChannelPromise promise;
109         private final Throwable cause;
110         private final ClosedChannelException closeCause;
111 
112         DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
113             this.promise = promise;
114             this.cause = cause;
115             this.closeCause = closeCause;
116         }
117     }
118     private DelayedClose delayedClose;
119     private boolean inputClosedSeenErrorOnRead;
120 
121     /**
122      * The future of the current connection attempt.  If not null, subsequent connection attempts will fail.
123      */
124     private ChannelPromise connectPromise;
125     private ScheduledFuture<?> connectTimeoutFuture;
126     private SocketAddress requestedRemoteAddress;
127     private CleanableDirectBuffer cleanable;
128     private ByteBuffer remoteAddressMemory;
129     private MsgHdrMemoryArray msgHdrMemoryArray;
130 
131     private IoRegistration registration;
132 
133     private volatile SocketAddress local;
134     private volatile SocketAddress remote;
135 
136     AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
137         super(parent);
138         this.socket = checkNotNull(socket, "fd");
139 
140         if (active) {
141             // Directly cache the remote and local addresses
142             // See https://github.com/netty/netty/issues/2359
143             this.active = true;
144             this.local = socket.localAddress();
145             this.remote = socket.remoteAddress();
146         }
147 
148         logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
149     }
150 
151     AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
152         super(parent);
153         this.socket = checkNotNull(fd, "fd");
154         this.active = true;
155 
156         // Directly cache the remote and local addresses
157         // See https://github.com/netty/netty/issues/2359
158         this.remote = remote;
159         this.local = fd.localAddress();
160     }
161 
162     // Called once a Channel changed from AUTO_READ=true to AUTO_READ=false
163     final void autoReadCleared() {
164         if (!isRegistered()) {
165             return;
166         }
167         IoRegistration registration = this.registration;
168         if (registration == null || !registration.isValid()) {
169             return;
170         }
171         if (eventLoop().inEventLoop()) {
172             clearRead();
173         } else {
174             eventLoop().execute(this::clearRead);
175         }
176     }
177 
178     private void clearRead() {
179         assert eventLoop().inEventLoop();
180         readPending = false;
181         IoRegistration registration = this.registration;
182         if (registration == null || !registration.isValid()) {
183             return;
184         }
185         // Also cancel all outstanding reads as the user did signal there is no more desire to read.
186         cancelOutstandingReads(registration(), numOutstandingReads);
187     }
188 
189     /**
190      * Returns the next id that should be used when submitting {@link IoUringIoOps}.
191      *
192      * @return  opsId
193      */
194     protected final short nextOpsId() {
195         short id = opsId++;
196 
197         // We use 0 for "none".
198         if (id == 0) {
199             id = opsId++;
200         }
201         return id;
202     }
203 
204     public final boolean isOpen() {
205         return socket.isOpen();
206     }
207 
208     @Override
209     public boolean isActive() {
210         return active;
211     }
212 
213     @Override
214     public final FileDescriptor fd() {
215         return socket;
216     }
217 
218     private AbstractUringUnsafe ioUringUnsafe() {
219         return (AbstractUringUnsafe) unsafe();
220     }
221 
222     @Override
223     protected boolean isCompatible(final EventLoop loop) {
224         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
225     }
226 
227     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
228         return newDirectBuffer(buf, buf);
229     }
230 
231     protected boolean allowMultiShotPollIn() {
232         return IoUring.isPollAddMultishotEnabled();
233     }
234 
235     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
236         final int readableBytes = buf.readableBytes();
237         if (readableBytes == 0) {
238             ReferenceCountUtil.release(holder);
239             return Unpooled.EMPTY_BUFFER;
240         }
241 
242         final ByteBufAllocator alloc = alloc();
243         if (alloc.isDirectBufferPooled()) {
244             return newDirectBuffer0(holder, buf, alloc, readableBytes);
245         }
246 
247         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
248         if (directBuf == null) {
249             return newDirectBuffer0(holder, buf, alloc, readableBytes);
250         }
251 
252         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
253         ReferenceCountUtil.safeRelease(holder);
254         return directBuf;
255     }
256 
257     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
258         final ByteBuf directBuf = alloc.directBuffer(capacity);
259         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
260         ReferenceCountUtil.safeRelease(holder);
261         return directBuf;
262     }
263 
264     /**
265      * Cancel all outstanding reads
266      *
267      * @param registration          the {@link IoRegistration}.
268      * @param numOutstandingReads   the number of outstanding reads, or {@code -1} if multi-shot was used.
269      */
270     protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
271 
272     /**
273      * Cancel all outstanding writes
274      *
275      * @param registration          the {@link IoRegistration}.
276      * @param numOutstandingWrites  the number of outstanding writes.
277      */
278     protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
279 
280     @Override
281     protected void doDisconnect() throws Exception {
282     }
283 
284     private void freeRemoteAddressMemory() {
285         if (remoteAddressMemory != null) {
286             cleanable.clean();
287             cleanable = null;
288             remoteAddressMemory = null;
289         }
290     }
291 
292     private void freeMsgHdrArray() {
293         if (msgHdrMemoryArray != null) {
294             msgHdrMemoryArray.release();
295             msgHdrMemoryArray = null;
296         }
297     }
298 
299     @Override
300     protected void doClose() throws Exception {
301         active = false;
302 
303         if (registration != null) {
304             if (socket.markClosed()) {
305                 int fd = fd().intValue();
306                 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
307                 registration.submit(ops);
308             }
309         } else {
310             // This one was never registered just use a syscall to close.
311             socket.close();
312             ioUringUnsafe().unregistered();
313         }
314     }
315 
316     @Override
317     protected final void doBeginRead() {
318         if (inputClosedSeenErrorOnRead) {
319             // We did see an error while reading and so closed the input. Stop reading.
320             return;
321         }
322         if (readPending) {
323             // We already have a read pending.
324             return;
325         }
326         readPending = true;
327         if (inReadComplete || !isActive()) {
328             // We are currently in the readComplete(...) callback which might issue more reads by itself.
329             // If readComplete(...) will not issue more reads itself it will pick up the readPending flag, reset it and
330             // call doBeginReadNow().
331             return;
332         }
333         doBeginReadNow();
334     }
335 
336     private void doBeginReadNow() {
337         if (inputClosedSeenErrorOnRead) {
338             // We did see an error while reading and so closed the input.
339             return;
340         }
341         if (!isPollInFirst() ||
342                 // If the socket was not empty, and we stopped reading we need to ensure we just force the
343                 // read as POLLIN might be edge-triggered (in case of POLL_ADD_MULTI).
344                 socketHasMoreData) {
345             // If the socket is blocking we will directly call scheduleFirstReadIfNeeded() as we can use FASTPOLL.
346             ioUringUnsafe().scheduleFirstReadIfNeeded();
347         } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
348             ioUringUnsafe().schedulePollIn();
349         }
350     }
351 
352     @Override
353     protected void doWrite(ChannelOutboundBuffer in) {
354         scheduleWriteIfNeeded(in, true);
355     }
356 
357     protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
358         if ((ioState & WRITE_SCHEDULED) != 0) {
359             return;
360         }
361         if (scheduleWrite(in) > 0) {
362             ioState |= WRITE_SCHEDULED;
363             if (submitAndRunNow && !isWritable()) {
364                 submitAndRunNow();
365             }
366         }
367     }
368 
369     protected void submitAndRunNow() {
370         // NOOP
371     }
372 
373     private int scheduleWrite(ChannelOutboundBuffer in) {
374         if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
375             return 0;
376         }
377         if (in == null) {
378             return 0;
379         }
380 
381         int msgCount = in.size();
382         if (msgCount == 0) {
383             return 0;
384         }
385         Object msg = in.current();
386 
387         if (msgCount > 1 && in.current() instanceof ByteBuf) {
388             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
389         } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
390                     (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
391             // We also need some special handling for CompositeByteBuf
392             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
393         } else {
394             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
395         }
396         // Ensure we never overflow
397         assert numOutstandingWrites > 0;
398         return numOutstandingWrites;
399     }
400 
401     protected final IoRegistration registration() {
402         assert registration != null;
403         return registration;
404     }
405 
406     private void schedulePollOut() {
407         pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
408     }
409 
410     final void schedulePollRdHup() {
411         pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
412     }
413 
414     private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
415         assert (ioState & ioMask) == 0;
416         int fd = fd().intValue();
417         IoRegistration registration = registration();
418         IoUringIoOps ops = IoUringIoOps.newPollAdd(
419                 fd, (byte) 0, mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
420         long id = registration.submit(ops);
421         if (id != 0) {
422             ioState |= (byte) ioMask;
423         }
424         return id;
425     }
426 
427     final void resetCachedAddresses() {
428         local = socket.localAddress();
429         remote = socket.remoteAddress();
430     }
431 
432     protected abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
433         private IoUringRecvByteAllocatorHandle allocHandle;
434         private boolean closed;
435         private boolean socketIsEmpty;
436 
437         /**
438          * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer} and returns the number of
439          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
440          */
441         protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
442 
443         /**
444          * Schedule the write of a single message and returns the number of
445          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
446          */
447         protected abstract int scheduleWriteSingle(Object msg);
448 
449         @Override
450         public final void handle(IoRegistration registration, IoEvent ioEvent) {
451             IoUringIoEvent event = (IoUringIoEvent) ioEvent;
452             byte op = event.opcode();
453             int res = event.res();
454             int flags = event.flags();
455             short data = event.data();
456             switch (op) {
457                 case Native.IORING_OP_RECV:
458                 case Native.IORING_OP_ACCEPT:
459                 case Native.IORING_OP_RECVMSG:
460                 case Native.IORING_OP_READ:
461                     readComplete(op, res, flags, data);
462                     break;
463                 case Native.IORING_OP_WRITEV:
464                 case Native.IORING_OP_SEND:
465                 case Native.IORING_OP_SENDMSG:
466                 case Native.IORING_OP_WRITE:
467                 case Native.IORING_OP_SPLICE:
468                 case Native.IORING_OP_SEND_ZC:
469                 case Native.IORING_OP_SENDMSG_ZC:
470                     writeComplete(op, res, flags, data);
471                     break;
472                 case Native.IORING_OP_POLL_ADD:
473                     pollAddComplete(res, flags, data);
474                     break;
475                 case Native.IORING_OP_ASYNC_CANCEL:
476                     cancelComplete0(op, res, flags, data);
477                     break;
478                 case Native.IORING_OP_CONNECT:
479                     connectComplete(op, res, flags, data);
480 
481                     // once the connect was completed we can also free some resources that are not needed anymore.
482                     freeMsgHdrArray();
483                     freeRemoteAddressMemory();
484                     break;
485                 case Native.IORING_OP_CLOSE:
486                     if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
487                         if (delayedClose != null) {
488                             delayedClose.promise.setSuccess();
489                         }
490                         closed = true;
491                     }
492                     break;
493                 default:
494                     break;
495             }
496 
497             // We delay the actual close if there is still a write or read scheduled, let's see if there
498             // was a close that needs to be done now.
499             handleDelayedClosed();
500 
501             if (ioState == 0 && closed) {
502                 // Cancel the registration now.
503                 registration.cancel();
504             }
505         }
506 
507         @Override
508         public void unregistered() {
509             freeMsgHdrArray();
510             freeRemoteAddressMemory();
511         }
512 
513         private void handleDelayedClosed() {
514             if (delayedClose != null && canCloseNow()) {
515                 closeNow();
516             }
517         }
518 
519         private void pollAddComplete(int res, int flags, short data) {
520             if ((res & Native.POLLOUT) != 0) {
521                 pollOut(res);
522             }
523             if ((res & Native.POLLIN) != 0) {
524                 pollIn(res, flags, data);
525             }
526             if ((res & Native.POLLRDHUP) != 0) {
527                 pollRdHup(res);
528             }
529         }
530 
531         @Override
532         public final void close() throws Exception {
533             close(voidPromise());
534         }
535 
536         @Override
537         protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
538             if (closeFuture().isDone()) {
539                 // Closed already before.
540                 safeSetSuccess(promise);
541                 return;
542             }
543             if (delayedClose == null) {
544                 // We have a write operation pending that should be completed asap.
545                 // We will do the actual close operation one this write result is returned as otherwise
546                 // we may get into trouble as we may close the fd while we did not process the write yet.
547                 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
548             } else {
549                 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
550                 return;
551             }
552 
553             boolean cancelConnect = false;
554             try {
555                 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
556                 if (connectPromise != null) {
557                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
558                     connectPromise.tryFailure(new ClosedChannelException());
559                     AbstractIoUringChannel.this.connectPromise = null;
560                     cancelConnect = true;
561                 }
562 
563                 cancelConnectTimeoutFuture();
564             } finally {
565                 // It's important we cancel all outstanding connect, write and read operations now so
566                 // we will be able to process a delayed close if needed.
567                 cancelOps(cancelConnect);
568             }
569 
570             if (canCloseNow()) {
571                 // Currently there are is no WRITE and READ scheduled so we can start to teardown the channel.
572                 closeNow();
573             }
574         }
575 
576         private void cancelOps(boolean cancelConnect) {
577             if (registration == null || !registration.isValid()) {
578                 return;
579             }
580             byte flags = (byte) 0;
581             if ((ioState & POLL_RDHUP_SCHEDULED) != 0 && pollRdhupId != 0) {
582                 long id = registration.submit(
583                         IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
584                 assert id != 0;
585                 pollRdhupId = 0;
586             }
587             if ((ioState & POLL_IN_SCHEDULED) != 0 && pollInId != 0) {
588                 long id = registration.submit(
589                         IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
590                 assert id != 0;
591                 pollInId = 0;
592             }
593             if ((ioState & POLL_OUT_SCHEDULED) != 0 && pollOutId != 0) {
594                 long id = registration.submit(
595                         IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
596                 assert id != 0;
597                 pollOutId = 0;
598             }
599             if (cancelConnect && connectId != 0) {
600                 // Best effort to cancel the already submitted connect request.
601                 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
602                 assert id != 0;
603                 connectId = 0;
604             }
605             cancelOutstandingReads(registration, numOutstandingReads);
606             cancelOutstandingWrites(registration, numOutstandingWrites);
607         }
608 
609         private boolean canCloseNow() {
610             // Currently there are is no WRITE and READ scheduled, we can close the channel now without
611             // problems related to re-ordering of completions.
612             return canCloseNow0() && (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
613         }
614 
615         protected boolean canCloseNow0() {
616             return true;
617         }
618 
619         private void closeNow() {
620             super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
621         }
622 
623         @Override
624         protected final void flush0() {
625             // Flush immediately only when there's no pending flush.
626             // If there's a pending flush operation, event loop will call forceFlush() later,
627             // and thus there's no need to call it now.
628             if ((ioState & POLL_OUT_SCHEDULED) == 0) {
629                 super.flush0();
630             }
631         }
632 
633         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
634             if (promise == null) {
635                 // Closed via cancellation and the promise has been notified already.
636                 return;
637             }
638 
639             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
640             promise.tryFailure(cause);
641             closeIfClosed();
642         }
643 
644         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
645             if (promise == null) {
646                 // Closed via cancellation and the promise has been notified already.
647                 return;
648             }
649             active = true;
650 
651             if (local == null) {
652                 local = socket.localAddress();
653             }
654             computeRemote();
655 
656             // Register POLLRDHUP
657             schedulePollRdHup();
658 
659             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
660             // We still need to ensure we call fireChannelActive() in this case.
661             boolean active = isActive();
662 
663             // trySuccess() will return false if a user cancelled the connection attempt.
664             boolean promiseSet = promise.trySuccess();
665 
666             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
667             // because what happened is what happened.
668             if (!wasActive && active) {
669                 pipeline().fireChannelActive();
670             }
671 
672             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
673             if (!promiseSet) {
674                 close(voidPromise());
675             }
676         }
677 
678         @Override
679         public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
680             if (allocHandle == null) {
681                 allocHandle = new IoUringRecvByteAllocatorHandle(
682                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
683             }
684             return allocHandle;
685         }
686 
687         final void shutdownInput(boolean allDataRead) {
688             logger.trace("shutdownInput Fd: {}", fd().intValue());
689             if (!socket.isInputShutdown()) {
690                 if (isAllowHalfClosure(config())) {
691                     try {
692                         socket.shutdown(true, false);
693                     } catch (IOException ignored) {
694                         // We attempted to shutdown and failed, which means the input has already effectively been
695                         // shutdown.
696                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
697                         return;
698                     } catch (NotYetConnectedException ignore) {
699                         // We attempted to shutdown and failed, which means the input has already effectively been
700                         // shutdown.
701                     }
702                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
703                 } else {
704                     // Handle this same way as if we did read all data so we don't schedule another read.
705                     inputClosedSeenErrorOnRead = true;
706                     close(voidPromise());
707                     return;
708                 }
709             }
710             if (allDataRead && !inputClosedSeenErrorOnRead) {
711                 inputClosedSeenErrorOnRead = true;
712                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
713             }
714         }
715 
716         private void fireEventAndClose(Object evt) {
717             pipeline().fireUserEventTriggered(evt);
718             close(voidPromise());
719         }
720 
721         final void schedulePollIn() {
722             assert (ioState & POLL_IN_SCHEDULED) == 0;
723             if (!isActive() || shouldBreakIoUringInReady(config())) {
724                 return;
725             }
726             pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, allowMultiShotPollIn());
727         }
728 
729         private void readComplete(byte op, int res, int flags, short data) {
730             assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
731 
732             boolean multishot = numOutstandingReads == -1;
733             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
734             if (rearm) {
735                 // Reset READ_SCHEDULED if there is nothing more to handle and so we need to re-arm. This works for
736                 // multi-shot and non multi-shot variants.
737                 ioState &= ~READ_SCHEDULED;
738             }
739             boolean pending = readPending;
740             if (multishot) {
741                 // Reset readPending so we can still keep track if we might need to cancel the multi-shot read or
742                 // not.
743                 readPending = false;
744             } else if (--numOutstandingReads == 0) {
745                 // We received all outstanding completions.
746                 readPending = false;
747                 ioState &= ~READ_SCHEDULED;
748             }
749             inReadComplete = true;
750             try {
751                 socketIsEmpty = socketIsEmpty(flags);
752                 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
753                         (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
754                 readComplete0(op, res, flags, data, numOutstandingReads);
755             } finally {
756                 try {
757                     // Check if we should consider the read loop to be done.
758                     if (recvBufAllocHandle().isReadComplete()) {
759                         // Reset the handle as we are done with the read-loop.
760                         recvBufAllocHandle().reset(config());
761 
762                         // Check if this was a readComplete(...) triggered by a read or multi-shot read.
763                         if (!multishot) {
764                             if (readPending) {
765                                 // This was a "normal" read and the user did signal we should continue reading.
766                                 // Let's schedule the read now.
767                                 doBeginReadNow();
768                             }
769                         } else {
770                             // The readComplete(...) was triggered by a multi-shot read. Because of this the state
771                             // machine is a bit more complicated.
772 
773                             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
774                                 // The readComplete(...) was triggered because the previous read was cancelled.
775                                 // In this case we we need to check if the user did signal the desire to read again
776                                 // in the meantime. If this is the case we need to schedule the read to ensure
777                                 // we do not stall.
778                                 if (pending) {
779                                     doBeginReadNow();
780                                 }
781                             } else if (rearm) {
782                                 // We need to rearm the multishot as otherwise we might miss some data.
783                                 doBeginReadNow();
784                             } else if (!readPending) {
785                                 // Cancel the multi-shot read now as the user did not signal that we want to keep
786                                 // reading while we handle the completion event.
787                                 cancelOutstandingReads(registration, numOutstandingReads);
788                             }
789                         }
790                     } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
791                         // The readComplete(...) was triggered because the previous read was cancelled.
792                         // In this case we we need to check if the user did signal the desire to read again
793                         // in the meantime. If this is the case we need to schedule the read to ensure
794                         // we do not stall.
795                         if (pending) {
796                             doBeginReadNow();
797                         }
798                     } else if (multishot && rearm) {
799                         // We need to rearm the multishot as otherwise we might miss some data.
800                         doBeginReadNow();
801                     }
802                 } finally {
803                     inReadComplete = false;
804                     socketIsEmpty = false;
805                 }
806             }
807         }
808 
809         /**
810          * Called once a read was completed.
811          */
812         protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
813 
814         /**
815          * Called once POLLRDHUP event is ready to be processed
816          */
817         private void pollRdHup(int res) {
818             ioState &= ~POLL_RDHUP_SCHEDULED;
819             pollRdhupId = 0;
820             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
821                 return;
822             }
823 
824             // Mark that we received a POLLRDHUP and so need to continue reading until all the input ist drained.
825             recvBufAllocHandle().rdHupReceived();
826 
827             if (isActive()) {
828                 scheduleFirstReadIfNeeded();
829             } else {
830                 // Just to be safe make sure the input marked as closed.
831                 shutdownInput(false);
832             }
833         }
834 
835         /**
836          * Called once POLLIN event is ready to be processed
837          */
838         private void pollIn(int res, int flags, short data) {
839             // Check if we need to rearm. This works for both cases, POLL_ADD and POLL_ADD_MULTI.
840             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
841             if (rearm) {
842                 ioState &= ~POLL_IN_SCHEDULED;
843                 pollInId = 0;
844             }
845             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
846                 return;
847             }
848             if (!readPending) {
849                 // We received the POLLIN but the user is not interested yet in reading, just mark socketHasMoreData
850                 // as true so we will trigger a read directly once the user calls read()
851                 socketHasMoreData = true;
852                 return;
853             }
854             scheduleFirstReadIfNeeded();
855         }
856 
857         private void scheduleFirstReadIfNeeded() {
858             if ((ioState & READ_SCHEDULED) == 0) {
859                 scheduleFirstRead();
860             }
861         }
862 
863         private void scheduleFirstRead() {
864             // This is a new "read loop" so we need to reset the allocHandle.
865             final ChannelConfig config = config();
866             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
867             allocHandle.reset(config);
868             scheduleRead(true);
869         }
870 
871         protected final void scheduleRead(boolean first) {
872             // Only schedule another read if the fd is still open.
873             if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
874                 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
875                 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
876                     ioState |= READ_SCHEDULED;
877                 }
878             }
879         }
880 
881         /**
882          * Schedule a read and returns the number of {@link #readComplete(byte, int, int, short)}
883          * calls that are expected because of the scheduled read.
884          *
885          * @param first             {@code true} if this is the first read of a read loop.
886          * @param socketIsEmpty     {@code true} if the socket is guaranteed to be empty, {@code false} otherwise.
887          * @return                  the number of {@link #readComplete(byte, int, int, short)} calls expected or
888          *                          {@code -1} if {@link #readComplete(byte, int, int, short)} is called until
889          *                          the read is cancelled (multi-shot).
890          */
891         protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
892 
893         /**
894          * Called once POLLOUT event is ready to be processed
895          *
896          * @param res   the result.
897          */
898         private void pollOut(int res) {
899             ioState &= ~POLL_OUT_SCHEDULED;
900             pollOutId = 0;
901             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
902                 return;
903             }
904             // pending connect
905             if (connectPromise != null) {
906                 // Note this method is invoked by the event loop only if the connection attempt was
907                 // neither cancelled nor timed out.
908 
909                 assert eventLoop().inEventLoop();
910 
911                 boolean connectStillInProgress = false;
912                 try {
913                     boolean wasActive = isActive();
914                     if (!socket.finishConnect()) {
915                         connectStillInProgress = true;
916                         return;
917                     }
918                     fulfillConnectPromise(connectPromise, wasActive);
919                 } catch (Throwable t) {
920                     fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
921                 } finally {
922                     if (!connectStillInProgress) {
923                         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0
924                         // is used
925                         // See https://github.com/netty/netty/issues/1770
926                         cancelConnectTimeoutFuture();
927                         connectPromise = null;
928                     } else {
929                         // The connect was not done yet, register for POLLOUT again
930                         schedulePollOut();
931                     }
932                 }
933             } else if (!socket.isOutputShutdown()) {
934                 // Try writing again
935                 super.flush0();
936             }
937         }
938 
939         /**
940          * Called once a write was completed.
941          *
942          * @param op    the op code.
943          * @param res   the result.
944          * @param flags the flags.
945          * @param data  the data that was passed when submitting the op.
946          */
947         private void writeComplete(byte op, int res, int flags, short data) {
948             if ((ioState & CONNECT_SCHEDULED) != 0) {
949                 // The writeComplete(...) callback was called because of a sendmsg(...) result that was used for
950                 // TCP_FASTOPEN_CONNECT.
951                 freeMsgHdrArray();
952                 if (res > 0) {
953                     // Connect complete!
954                     outboundBuffer().removeBytes(res);
955 
956                     // Explicit pass in 0 as this is returned by a connect(...) call when it was successful.
957                     connectComplete(op, 0, flags, data);
958                 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
959                     // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
960                     // In this case, our TCP connection will be established normally, but no data was transmitted at
961                     // this time. We'll just transmit the data with normal writes later.
962                     // Let's submit a normal connect.
963                     submitConnect((InetSocketAddress) requestedRemoteAddress);
964                 } else {
965                     // There was an error, handle it as a normal connect error.
966                     connectComplete(op, res, flags, data);
967                 }
968                 return;
969             }
970 
971             if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
972                 assert numOutstandingWrites > 0;
973                 --numOutstandingWrites;
974             }
975 
976             boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
977             if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
978 
979                 // We were not able to write everything, let's register for POLLOUT
980                 schedulePollOut();
981             }
982 
983             // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
984             // while still removing messages internally in removeBytes(...) which then may corrupt state.
985             if (numOutstandingWrites == 0) {
986                 ioState &= ~WRITE_SCHEDULED;
987 
988                 // If we could write all and we did not schedule a pollout yet let us try to write again
989                 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
990                     scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
991                 }
992             }
993         }
994 
995         /**
996          * Called once a write was completed.
997          * @param op            the op code
998          * @param res           the result.
999          * @param flags         the flags.
1000          * @param data          the data that was passed when submitting the op.
1001          * @param outstanding   the outstanding write completions.
1002          */
1003         abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1004 
1005         /**
1006          * Called once a cancel was completed.
1007          *
1008          * @param op            the op code
1009          * @param res           the result.
1010          * @param flags         the flags.
1011          * @param data          the data that was passed when submitting the op.
1012          */
1013         void cancelComplete0(byte op, int res, int flags, short data) {
1014             // NOOP
1015         }
1016 
1017         /**
1018          * Called once a connect was completed.
1019          * @param op            the op code.
1020          * @param res           the result.
1021          * @param flags         the flags.
1022          * @param data          the data that was passed when submitting the op.
1023          */
1024         void connectComplete(byte op, int res, int flags, short data) {
1025             ioState &= ~CONNECT_SCHEDULED;
1026             freeRemoteAddressMemory();
1027 
1028             if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1029                 // connect not complete yet need to wait for poll_out event
1030                 schedulePollOut();
1031             } else {
1032                 try {
1033                     if (res == 0) {
1034                         fulfillConnectPromise(connectPromise, active);
1035                         if (readPending) {
1036                             doBeginReadNow();
1037                         }
1038                     } else {
1039                         try {
1040                             Errors.throwConnectException("io_uring connect", res);
1041                         } catch (Throwable cause) {
1042                             fulfillConnectPromise(connectPromise, cause);
1043                         }
1044                     }
1045                 } finally {
1046                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is
1047                     // used
1048                     // See https://github.com/netty/netty/issues/1770
1049                     cancelConnectTimeoutFuture();
1050                     connectPromise = null;
1051                 }
1052             }
1053         }
1054 
1055         @Override
1056         public void connect(
1057                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1058             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
1059             // non-blocking io.
1060             if (promise.isDone() || !ensureOpen(promise)) {
1061                 return;
1062             }
1063 
1064             if (delayedClose != null) {
1065                 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1066                 return;
1067             }
1068             try {
1069                 if (connectPromise != null) {
1070                     throw new ConnectionPendingException();
1071                 }
1072                 if (localAddress instanceof InetSocketAddress) {
1073                     checkResolvable((InetSocketAddress) localAddress);
1074                 }
1075 
1076                 if (remoteAddress instanceof InetSocketAddress) {
1077                     checkResolvable((InetSocketAddress) remoteAddress);
1078                 }
1079 
1080                 if (remote != null) {
1081                     // Check if already connected before trying to connect. This is needed as connect(...) will not#
1082                     // return -1 and set errno to EISCONN if a previous connect(...) attempt was setting errno to
1083                     // EINPROGRESS and finished later.
1084                     throw new AlreadyConnectedException();
1085                 }
1086 
1087                 if (localAddress != null) {
1088                     socket.bind(localAddress);
1089                 }
1090 
1091                 if (remoteAddress instanceof InetSocketAddress) {
1092                     InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1093                     ByteBuf initialData = null;
1094                     if (IoUring.isTcpFastOpenClientSideAvailable() &&
1095                         config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1096                         ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1097                         outbound.addFlush();
1098                         Object curr;
1099                         if ((curr = outbound.current()) instanceof ByteBuf) {
1100                             initialData = (ByteBuf) curr;
1101                         }
1102                     }
1103                     if (initialData != null) {
1104                         msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1105                         MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1106                         hdr.set(socket, inetSocketAddress, IoUring.memoryAddress(initialData),
1107                                 initialData.readableBytes(), (short) 0);
1108 
1109                         int fd = fd().intValue();
1110                         IoRegistration registration = registration();
1111                         IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
1112                                 hdr.address(), hdr.idx());
1113                         connectId = registration.submit(ops);
1114                         if (connectId == 0) {
1115                             // Directly release the memory if submitting failed.
1116                             freeMsgHdrArray();
1117                         }
1118                     } else {
1119                         submitConnect(inetSocketAddress);
1120                     }
1121                 } else if (remoteAddress instanceof DomainSocketAddress) {
1122                     DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) remoteAddress;
1123                     submitConnect(unixDomainSocketAddress);
1124                 } else {
1125                     throw new Error("Unexpected SocketAddress implementation " + className(remoteAddress));
1126                 }
1127 
1128                 if (connectId != 0) {
1129                     ioState |= CONNECT_SCHEDULED;
1130                 }
1131             } catch (Throwable t) {
1132                 closeIfClosed();
1133                 promise.tryFailure(annotateConnectException(t, remoteAddress));
1134                 return;
1135             }
1136             connectPromise = promise;
1137             requestedRemoteAddress = remoteAddress;
1138             // Schedule connect timeout.
1139             int connectTimeoutMillis = config().getConnectTimeoutMillis();
1140             if (connectTimeoutMillis > 0) {
1141                 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1142                     @Override
1143                     public void run() {
1144                         ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1145                         if (connectPromise != null && !connectPromise.isDone() &&
1146                                 connectPromise.tryFailure(new ConnectTimeoutException(
1147                                         "connection timed out: " + remoteAddress))) {
1148                             close(voidPromise());
1149                         }
1150                     }
1151                 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1152             }
1153 
1154             promise.addListener(new ChannelFutureListener() {
1155                 @Override
1156                 public void operationComplete(ChannelFuture future) {
1157                     // If the connect future is cancelled we also cancel the timeout and close the
1158                     // underlying socket.
1159                     if (future.isCancelled()) {
1160                         cancelConnectTimeoutFuture();
1161                         connectPromise = null;
1162                         close(voidPromise());
1163                     }
1164                 }
1165             });
1166         }
1167     }
1168 
1169     private void submitConnect(InetSocketAddress inetSocketAddress) {
1170         cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1171         remoteAddressMemory = cleanable.buffer();
1172 
1173         SockaddrIn.set(socket.isIpv6(), remoteAddressMemory, inetSocketAddress);
1174 
1175         int fd = fd().intValue();
1176         IoRegistration registration = registration();
1177         IoUringIoOps ops = IoUringIoOps.newConnect(
1178                 fd, (byte) 0, Buffer.memoryAddress(remoteAddressMemory), nextOpsId());
1179         connectId = registration.submit(ops);
1180         if (connectId == 0) {
1181             // Directly release the memory if submitting failed.
1182             freeRemoteAddressMemory();
1183         }
1184     }
1185 
1186     private void submitConnect(DomainSocketAddress unixDomainSocketAddress) {
1187         cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_UN);
1188         remoteAddressMemory = cleanable.buffer();
1189         int addrLen = SockaddrIn.setUds(remoteAddressMemory, unixDomainSocketAddress);
1190         int fd = fd().intValue();
1191         IoRegistration registration = registration();
1192         long addr = Buffer.memoryAddress(remoteAddressMemory);
1193         IoUringIoOps ops = IoUringIoOps.newConnect(fd, (byte) 0, addr, addrLen, nextOpsId());
1194         connectId = registration.submit(ops);
1195         if (connectId == 0) {
1196             // Directly release the memory if submitting failed.
1197             freeRemoteAddressMemory();
1198         }
1199     }
1200 
1201     @Override
1202     protected Object filterOutboundMessage(Object msg) {
1203         if (msg instanceof ByteBuf) {
1204             ByteBuf buf = (ByteBuf) msg;
1205             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1206         }
1207         throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
1208     }
1209 
1210     @Override
1211     protected void doRegister(ChannelPromise promise) {
1212         IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1213         eventLoop.register(ioUringUnsafe()).addListener(f -> {
1214             if (f.isSuccess()) {
1215                 registration = (IoRegistration) f.getNow();
1216                 promise.setSuccess();
1217             } else {
1218                 promise.setFailure(f.cause());
1219             }
1220         });
1221     }
1222 
1223     @Override
1224     protected final void doDeregister() {
1225         // Cancel all previous submitted ops.
1226         ioUringUnsafe().cancelOps(connectPromise != null);
1227     }
1228 
1229     @Override
1230     protected void doBind(final SocketAddress local) throws Exception {
1231         if (local instanceof InetSocketAddress) {
1232             checkResolvable((InetSocketAddress) local);
1233         }
1234         socket.bind(local);
1235         this.local = socket.localAddress();
1236     }
1237 
1238     protected static void checkResolvable(InetSocketAddress addr) {
1239         if (addr.isUnresolved()) {
1240             throw new UnresolvedAddressException();
1241         }
1242     }
1243 
1244     @Override
1245     protected final SocketAddress localAddress0() {
1246         return local;
1247     }
1248 
1249     @Override
1250     protected final SocketAddress remoteAddress0() {
1251         return remote;
1252     }
1253 
1254     private static boolean isAllowHalfClosure(ChannelConfig config) {
1255         return config instanceof SocketChannelConfig &&
1256                ((SocketChannelConfig) config).isAllowHalfClosure();
1257     }
1258 
1259     private void cancelConnectTimeoutFuture() {
1260         if (connectTimeoutFuture != null) {
1261             connectTimeoutFuture.cancel(false);
1262             connectTimeoutFuture = null;
1263         }
1264     }
1265 
1266     private void computeRemote() {
1267         if (requestedRemoteAddress instanceof InetSocketAddress) {
1268             remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1269         }
1270     }
1271 
1272     private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1273         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1274     }
1275 
1276     /**
1277      * Return if the socket is guaranteed to be empty when the submitted io was executed and the completion event be
1278      * created.
1279      * @param flags     the flags that were part of the completion
1280      * @return          {@code true} if empty.
1281      */
1282     protected abstract boolean socketIsEmpty(int flags);
1283 
1284     abstract boolean isPollInFirst();
1285 }