View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufHolder;
21  import io.netty.buffer.ByteBufUtil;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractChannel;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelOption;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.ServerChannel;
38  import io.netty.channel.socket.ChannelInputShutdownEvent;
39  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40  import io.netty.channel.socket.SocketChannelConfig;
41  import io.netty.channel.unix.Buffer;
42  import io.netty.channel.unix.DomainSocketAddress;
43  import io.netty.channel.unix.Errors;
44  import io.netty.channel.unix.FileDescriptor;
45  import io.netty.channel.unix.UnixChannel;
46  import io.netty.channel.unix.UnixChannelUtil;
47  import io.netty.util.ReferenceCountUtil;
48  import io.netty.util.concurrent.PromiseNotifier;
49  import io.netty.util.internal.CleanableDirectBuffer;
50  import io.netty.util.internal.logging.InternalLogger;
51  import io.netty.util.internal.logging.InternalLoggerFactory;
52  
53  import java.io.IOException;
54  import java.net.InetSocketAddress;
55  import java.net.SocketAddress;
56  import java.nio.ByteBuffer;
57  import java.nio.channels.AlreadyConnectedException;
58  import java.nio.channels.ClosedChannelException;
59  import java.nio.channels.ConnectionPendingException;
60  import java.nio.channels.NotYetConnectedException;
61  import java.nio.channels.UnresolvedAddressException;
62  import java.util.concurrent.ScheduledFuture;
63  import java.util.concurrent.TimeUnit;
64  
65  import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
66  import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
67  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
68  import static io.netty.util.internal.ObjectUtil.checkNotNull;
69  import static io.netty.util.internal.StringUtil.className;
70  
71  
72  abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
73      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
74      final LinuxSocket socket;
75      protected volatile boolean active;
76  
77      // Different masks for outstanding I/O operations.
78      private static final int POLL_IN_SCHEDULED = 1;
79      private static final int POLL_OUT_SCHEDULED = 1 << 2;
80      private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
81      private static final int WRITE_SCHEDULED = 1 << 4;
82      private static final int READ_SCHEDULED = 1 << 5;
83      private static final int CONNECT_SCHEDULED = 1 << 6;
84  
85      private short opsId = Short.MIN_VALUE;
86  
87      private long pollInId;
88      private long pollOutId;
89      private long pollRdhupId;
90      private long connectId;
91  
92      // A byte is enough for now.
93      private byte ioState;
94  
95      // It's possible that multiple read / writes are issued. We need to keep track of these.
96      // Let's limit the amount of pending writes and reads by Short.MAX_VALUE. Maybe Byte.MAX_VALUE would also be good
97      // enough but let's be a bit more flexible for now.
98      private short numOutstandingWrites;
99      // A value of -1 means that multi-shot is used and so reads will be issued as long as the request is not canceled.
100     private short numOutstandingReads;
101 
102     private boolean readPending;
103     private boolean inReadComplete;
104     private boolean socketHasMoreData;
105 
106     private static final class DelayedClose {
107         private final ChannelPromise promise;
108         private final Throwable cause;
109         private final ClosedChannelException closeCause;
110 
111         DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
112             this.promise = promise;
113             this.cause = cause;
114             this.closeCause = closeCause;
115         }
116     }
117     private DelayedClose delayedClose;
118     private boolean inputClosedSeenErrorOnRead;
119 
120     /**
121      * The future of the current connection attempt.  If not null, subsequent connection attempts will fail.
122      */
123     private ChannelPromise connectPromise;
124     private ScheduledFuture<?> connectTimeoutFuture;
125     private SocketAddress requestedRemoteAddress;
126     private CleanableDirectBuffer cleanable;
127     private ByteBuffer remoteAddressMemory;
128     private MsgHdrMemoryArray msgHdrMemoryArray;
129 
130     private IoRegistration registration;
131 
132     private volatile SocketAddress local;
133     private volatile SocketAddress remote;
134 
135     AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
136         super(parent);
137         this.socket = checkNotNull(socket, "fd");
138 
139         if (active) {
140             // Directly cache the remote and local addresses
141             // See https://github.com/netty/netty/issues/2359
142             this.active = true;
143             this.local = socket.localAddress();
144             this.remote = socket.remoteAddress();
145         }
146 
147         logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
148     }
149 
150     AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
151         super(parent);
152         this.socket = checkNotNull(fd, "fd");
153         this.active = true;
154 
155         // Directly cache the remote and local addresses
156         // See https://github.com/netty/netty/issues/2359
157         this.remote = remote;
158         this.local = fd.localAddress();
159     }
160 
161     // Called once a Channel changed from AUTO_READ=true to AUTO_READ=false
162     final void autoReadCleared() {
163         if (!isRegistered()) {
164             return;
165         }
166         IoRegistration registration = this.registration;
167         if (registration == null || !registration.isValid()) {
168             return;
169         }
170         if (eventLoop().inEventLoop()) {
171             clearRead();
172         } else {
173             eventLoop().execute(this::clearRead);
174         }
175     }
176 
177     private void clearRead() {
178         assert eventLoop().inEventLoop();
179         readPending = false;
180         IoRegistration registration = this.registration;
181         if (registration == null || !registration.isValid()) {
182             return;
183         }
184         // Also cancel all outstanding reads as the user did signal there is no more desire to read.
185         cancelOutstandingReads(registration(), numOutstandingReads);
186     }
187 
188     /**
189      * Returns the next id that should be used when submitting {@link IoUringIoOps}.
190      *
191      * @return  opsId
192      */
193     protected final short nextOpsId() {
194         short id = opsId++;
195 
196         // We use 0 for "none".
197         if (id == 0) {
198             id = opsId++;
199         }
200         return id;
201     }
202 
203     public final boolean isOpen() {
204         return socket.isOpen();
205     }
206 
207     @Override
208     public boolean isActive() {
209         return active;
210     }
211 
212     @Override
213     public final FileDescriptor fd() {
214         return socket;
215     }
216 
217     private AbstractUringUnsafe ioUringUnsafe() {
218         return (AbstractUringUnsafe) unsafe();
219     }
220 
221     @Override
222     protected boolean isCompatible(final EventLoop loop) {
223         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
224     }
225 
226     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
227         return newDirectBuffer(buf, buf);
228     }
229 
230     protected boolean allowMultiShotPollIn() {
231         return IoUring.isPollAddMultishotEnabled();
232     }
233 
234     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
235         final int readableBytes = buf.readableBytes();
236         if (readableBytes == 0) {
237             ReferenceCountUtil.release(holder);
238             return Unpooled.EMPTY_BUFFER;
239         }
240 
241         final ByteBufAllocator alloc = alloc();
242         if (alloc.isDirectBufferPooled()) {
243             return newDirectBuffer0(holder, buf, alloc, readableBytes);
244         }
245 
246         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
247         if (directBuf == null) {
248             return newDirectBuffer0(holder, buf, alloc, readableBytes);
249         }
250 
251         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
252         ReferenceCountUtil.safeRelease(holder);
253         return directBuf;
254     }
255 
256     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
257         final ByteBuf directBuf = alloc.directBuffer(capacity);
258         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
259         ReferenceCountUtil.safeRelease(holder);
260         return directBuf;
261     }
262 
263     /**
264      * Cancel all outstanding reads
265      *
266      * @param registration          the {@link IoRegistration}.
267      * @param numOutstandingReads   the number of outstanding reads, or {@code -1} if multi-shot was used.
268      */
269     protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
270 
271     /**
272      * Cancel all outstanding writes
273      *
274      * @param registration          the {@link IoRegistration}.
275      * @param numOutstandingWrites  the number of outstanding writes.
276      */
277     protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
278 
279     @Override
280     protected void doDisconnect() throws Exception {
281     }
282 
283     private void freeRemoteAddressMemory() {
284         if (remoteAddressMemory != null) {
285             cleanable.clean();
286             cleanable = null;
287             remoteAddressMemory = null;
288         }
289     }
290 
291     private void freeMsgHdrArray() {
292         if (msgHdrMemoryArray != null) {
293             msgHdrMemoryArray.release();
294             msgHdrMemoryArray = null;
295         }
296     }
297 
298     @Override
299     protected void doClose() throws Exception {
300         active = false;
301 
302         if (registration != null) {
303             if (socket.markClosed()) {
304                 int fd = fd().intValue();
305                 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
306                 registration.submit(ops);
307             }
308         } else {
309             // This one was never registered just use a syscall to close.
310             socket.close();
311             ioUringUnsafe().unregistered();
312         }
313     }
314 
315     @Override
316     protected final void doBeginRead() {
317         if (inputClosedSeenErrorOnRead) {
318             // We did see an error while reading and so closed the input. Stop reading.
319             return;
320         }
321         if (readPending) {
322             // We already have a read pending.
323             return;
324         }
325         readPending = true;
326         if (inReadComplete || !isActive()) {
327             // We are currently in the readComplete(...) callback which might issue more reads by itself.
328             // If readComplete(...) will not issue more reads itself it will pick up the readPending flag, reset it and
329             // call doBeginReadNow().
330             return;
331         }
332         doBeginReadNow();
333     }
334 
335     private void doBeginReadNow() {
336         if (inputClosedSeenErrorOnRead) {
337             // We did see an error while reading and so closed the input.
338             return;
339         }
340         if (!isPollInFirst() ||
341                 // If the socket was not empty, and we stopped reading we need to ensure we just force the
342                 // read as POLLIN might be edge-triggered (in case of POLL_ADD_MULTI).
343                 socketHasMoreData) {
344             // If the socket is blocking we will directly call scheduleFirstReadIfNeeded() as we can use FASTPOLL.
345             ioUringUnsafe().scheduleFirstReadIfNeeded();
346         } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
347             ioUringUnsafe().schedulePollIn();
348         }
349     }
350 
351     @Override
352     protected void doWrite(ChannelOutboundBuffer in) {
353         scheduleWriteIfNeeded(in, true);
354     }
355 
356     protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
357         if ((ioState & WRITE_SCHEDULED) != 0) {
358             return;
359         }
360         if (scheduleWrite(in) > 0) {
361             ioState |= WRITE_SCHEDULED;
362             if (submitAndRunNow && !isWritable()) {
363                 submitAndRunNow();
364             }
365         }
366     }
367 
368     protected void submitAndRunNow() {
369         // NOOP
370     }
371 
372     private int scheduleWrite(ChannelOutboundBuffer in) {
373         if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
374             return 0;
375         }
376         if (in == null) {
377             return 0;
378         }
379 
380         int msgCount = in.size();
381         if (msgCount == 0) {
382             return 0;
383         }
384         Object msg = in.current();
385 
386         if (msgCount > 1 && in.current() instanceof ByteBuf) {
387             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
388         } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
389                     (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
390             // We also need some special handling for CompositeByteBuf
391             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
392         } else {
393             numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
394         }
395         // Ensure we never overflow
396         assert numOutstandingWrites > 0;
397         return numOutstandingWrites;
398     }
399 
400     protected final IoRegistration registration() {
401         assert registration != null;
402         return registration;
403     }
404 
405     private void schedulePollOut() {
406         pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
407     }
408 
409     final void schedulePollRdHup() {
410         pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
411     }
412 
413     private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
414         assert (ioState & ioMask) == 0;
415         int fd = fd().intValue();
416         IoRegistration registration = registration();
417         IoUringIoOps ops = IoUringIoOps.newPollAdd(
418                 fd, (byte) 0, mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
419         long id = registration.submit(ops);
420         if (id != 0) {
421             ioState |= (byte) ioMask;
422         }
423         return id;
424     }
425 
426     final void resetCachedAddresses() {
427         local = socket.localAddress();
428         remote = socket.remoteAddress();
429     }
430 
431     protected abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
432         private IoUringRecvByteAllocatorHandle allocHandle;
433         private boolean closed;
434         private boolean socketIsEmpty;
435 
436         /**
437          * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer} and returns the number of
438          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
439          */
440         protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
441 
442         /**
443          * Schedule the write of a single message and returns the number of
444          * {@link #writeComplete(byte, int, int, short)} calls that are expected because of the scheduled write.
445          */
446         protected abstract int scheduleWriteSingle(Object msg);
447 
448         @Override
449         public final void handle(IoRegistration registration, IoEvent ioEvent) {
450             IoUringIoEvent event = (IoUringIoEvent) ioEvent;
451             byte op = event.opcode();
452             int res = event.res();
453             int flags = event.flags();
454             short data = event.data();
455             switch (op) {
456                 case Native.IORING_OP_RECV:
457                 case Native.IORING_OP_ACCEPT:
458                 case Native.IORING_OP_RECVMSG:
459                 case Native.IORING_OP_READ:
460                     readComplete(op, res, flags, data);
461                     break;
462                 case Native.IORING_OP_WRITEV:
463                 case Native.IORING_OP_SEND:
464                 case Native.IORING_OP_SENDMSG:
465                 case Native.IORING_OP_WRITE:
466                 case Native.IORING_OP_SPLICE:
467                 case Native.IORING_OP_SEND_ZC:
468                 case Native.IORING_OP_SENDMSG_ZC:
469                     writeComplete(op, res, flags, data);
470                     break;
471                 case Native.IORING_OP_POLL_ADD:
472                     pollAddComplete(res, flags, data);
473                     break;
474                 case Native.IORING_OP_ASYNC_CANCEL:
475                     cancelComplete0(op, res, flags, data);
476                     break;
477                 case Native.IORING_OP_CONNECT:
478                     connectComplete(op, res, flags, data);
479 
480                     // once the connect was completed we can also free some resources that are not needed anymore.
481                     freeMsgHdrArray();
482                     freeRemoteAddressMemory();
483                     break;
484                 case Native.IORING_OP_CLOSE:
485                     if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
486                         if (delayedClose != null) {
487                             delayedClose.promise.setSuccess();
488                         }
489                         closed = true;
490                     }
491                     break;
492                 default:
493                     break;
494             }
495 
496             // We delay the actual close if there is still a write or read scheduled, let's see if there
497             // was a close that needs to be done now.
498             handleDelayedClosed();
499 
500             if (ioState == 0 && closed) {
501                 // Cancel the registration now.
502                 registration.cancel();
503             }
504         }
505 
506         @Override
507         public void unregistered() {
508             freeMsgHdrArray();
509             freeRemoteAddressMemory();
510         }
511 
512         private void handleDelayedClosed() {
513             if (delayedClose != null && canCloseNow()) {
514                 closeNow();
515             }
516         }
517 
518         private void pollAddComplete(int res, int flags, short data) {
519             if ((res & Native.POLLOUT) != 0) {
520                 pollOut(res);
521             }
522             if ((res & Native.POLLIN) != 0) {
523                 pollIn(res, flags, data);
524             }
525             if ((res & Native.POLLRDHUP) != 0) {
526                 pollRdHup(res);
527             }
528         }
529 
530         @Override
531         public final void close() throws Exception {
532             close(voidPromise());
533         }
534 
535         @Override
536         protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
537             if (closeFuture().isDone()) {
538                 // Closed already before.
539                 safeSetSuccess(promise);
540                 return;
541             }
542             if (delayedClose == null) {
543                 // We have a write operation pending that should be completed asap.
544                 // We will do the actual close operation one this write result is returned as otherwise
545                 // we may get into trouble as we may close the fd while we did not process the write yet.
546                 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
547             } else {
548                 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
549                 return;
550             }
551 
552             boolean cancelConnect = false;
553             try {
554                 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
555                 if (connectPromise != null) {
556                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
557                     connectPromise.tryFailure(new ClosedChannelException());
558                     AbstractIoUringChannel.this.connectPromise = null;
559                     cancelConnect = true;
560                 }
561 
562                 cancelConnectTimeoutFuture();
563             } finally {
564                 // It's important we cancel all outstanding connect, write and read operations now so
565                 // we will be able to process a delayed close if needed.
566                 cancelOps(cancelConnect);
567             }
568 
569             if (canCloseNow()) {
570                 // Currently there are is no WRITE and READ scheduled so we can start to teardown the channel.
571                 closeNow();
572             }
573         }
574 
575         private void cancelOps(boolean cancelConnect) {
576             if (registration == null || !registration.isValid()) {
577                 return;
578             }
579             byte flags = (byte) 0;
580             if ((ioState & POLL_RDHUP_SCHEDULED) != 0 && pollRdhupId != 0) {
581                 long id = registration.submit(
582                         IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
583                 assert id != 0;
584                 pollRdhupId = 0;
585             }
586             if ((ioState & POLL_IN_SCHEDULED) != 0 && pollInId != 0) {
587                 long id = registration.submit(
588                         IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
589                 assert id != 0;
590                 pollInId = 0;
591             }
592             if ((ioState & POLL_OUT_SCHEDULED) != 0 && pollOutId != 0) {
593                 long id = registration.submit(
594                         IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
595                 assert id != 0;
596                 pollOutId = 0;
597             }
598             if (cancelConnect && connectId != 0) {
599                 // Best effort to cancel the already submitted connect request.
600                 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
601                 assert id != 0;
602                 connectId = 0;
603             }
604             cancelOutstandingReads(registration, numOutstandingReads);
605             cancelOutstandingWrites(registration, numOutstandingWrites);
606         }
607 
608         private boolean canCloseNow() {
609             // Currently there are is no WRITE and READ scheduled, we can close the channel now without
610             // problems related to re-ordering of completions.
611             return canCloseNow0() && (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
612         }
613 
614         protected boolean canCloseNow0() {
615             return true;
616         }
617 
618         private void closeNow() {
619             super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
620         }
621 
622         @Override
623         protected final void flush0() {
624             // Flush immediately only when there's no pending flush.
625             // If there's a pending flush operation, event loop will call forceFlush() later,
626             // and thus there's no need to call it now.
627             if ((ioState & POLL_OUT_SCHEDULED) == 0) {
628                 super.flush0();
629             }
630         }
631 
632         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
633             if (promise == null) {
634                 // Closed via cancellation and the promise has been notified already.
635                 return;
636             }
637 
638             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
639             promise.tryFailure(cause);
640             closeIfClosed();
641         }
642 
643         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
644             if (promise == null) {
645                 // Closed via cancellation and the promise has been notified already.
646                 return;
647             }
648             active = true;
649 
650             if (local == null) {
651                 local = socket.localAddress();
652             }
653             computeRemote();
654 
655             // Register POLLRDHUP
656             schedulePollRdHup();
657 
658             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
659             // We still need to ensure we call fireChannelActive() in this case.
660             boolean active = isActive();
661 
662             // trySuccess() will return false if a user cancelled the connection attempt.
663             boolean promiseSet = promise.trySuccess();
664 
665             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
666             // because what happened is what happened.
667             if (!wasActive && active) {
668                 pipeline().fireChannelActive();
669             }
670 
671             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
672             if (!promiseSet) {
673                 close(voidPromise());
674             }
675         }
676 
677         @Override
678         public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
679             if (allocHandle == null) {
680                 allocHandle = new IoUringRecvByteAllocatorHandle(
681                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
682             }
683             return allocHandle;
684         }
685 
686         final void shutdownInput(boolean allDataRead) {
687             logger.trace("shutdownInput Fd: {}", fd().intValue());
688             if (!socket.isInputShutdown()) {
689                 if (isAllowHalfClosure(config())) {
690                     try {
691                         socket.shutdown(true, false);
692                     } catch (IOException ignored) {
693                         // We attempted to shutdown and failed, which means the input has already effectively been
694                         // shutdown.
695                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
696                         return;
697                     } catch (NotYetConnectedException ignore) {
698                         // We attempted to shutdown and failed, which means the input has already effectively been
699                         // shutdown.
700                     }
701                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
702                 } else {
703                     // Handle this same way as if we did read all data so we don't schedule another read.
704                     inputClosedSeenErrorOnRead = true;
705                     close(voidPromise());
706                     return;
707                 }
708             }
709             if (allDataRead && !inputClosedSeenErrorOnRead) {
710                 inputClosedSeenErrorOnRead = true;
711                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
712             }
713         }
714 
715         private void fireEventAndClose(Object evt) {
716             pipeline().fireUserEventTriggered(evt);
717             close(voidPromise());
718         }
719 
720         final void schedulePollIn() {
721             assert (ioState & POLL_IN_SCHEDULED) == 0;
722             if (!isActive() || shouldBreakIoUringInReady(config())) {
723                 return;
724             }
725             pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, allowMultiShotPollIn());
726         }
727 
728         private void readComplete(byte op, int res, int flags, short data) {
729             assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
730 
731             boolean multishot = numOutstandingReads == -1;
732             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
733             if (rearm) {
734                 // Reset READ_SCHEDULED if there is nothing more to handle and so we need to re-arm. This works for
735                 // multi-shot and non multi-shot variants.
736                 ioState &= ~READ_SCHEDULED;
737             }
738             boolean pending = readPending;
739             if (multishot) {
740                 // Reset readPending so we can still keep track if we might need to cancel the multi-shot read or
741                 // not.
742                 readPending = false;
743             } else if (--numOutstandingReads == 0) {
744                 // We received all outstanding completions.
745                 readPending = false;
746                 ioState &= ~READ_SCHEDULED;
747             }
748             inReadComplete = true;
749             try {
750                 socketIsEmpty = socketIsEmpty(flags);
751                 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
752                         (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
753                 readComplete0(op, res, flags, data, numOutstandingReads);
754             } finally {
755                 try {
756                     // Check if we should consider the read loop to be done.
757                     if (recvBufAllocHandle().isReadComplete()) {
758                         // Reset the handle as we are done with the read-loop.
759                         recvBufAllocHandle().reset(config());
760 
761                         // Check if this was a readComplete(...) triggered by a read or multi-shot read.
762                         if (!multishot) {
763                             if (readPending) {
764                                 // This was a "normal" read and the user did signal we should continue reading.
765                                 // Let's schedule the read now.
766                                 doBeginReadNow();
767                             }
768                         } else {
769                             // The readComplete(...) was triggered by a multi-shot read. Because of this the state
770                             // machine is a bit more complicated.
771 
772                             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
773                                 // The readComplete(...) was triggered because the previous read was cancelled.
774                                 // In this case we we need to check if the user did signal the desire to read again
775                                 // in the meantime. If this is the case we need to schedule the read to ensure
776                                 // we do not stall.
777                                 if (pending) {
778                                     doBeginReadNow();
779                                 }
780                             } else if (rearm) {
781                                 // We need to rearm the multishot as otherwise we might miss some data.
782                                 doBeginReadNow();
783                             } else if (!readPending) {
784                                 // Cancel the multi-shot read now as the user did not signal that we want to keep
785                                 // reading while we handle the completion event.
786                                 cancelOutstandingReads(registration, numOutstandingReads);
787                             }
788                         }
789                     } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
790                         // The readComplete(...) was triggered because the previous read was cancelled.
791                         // In this case we we need to check if the user did signal the desire to read again
792                         // in the meantime. If this is the case we need to schedule the read to ensure
793                         // we do not stall.
794                         if (pending) {
795                             doBeginReadNow();
796                         }
797                     } else if (multishot && rearm) {
798                         // We need to rearm the multishot as otherwise we might miss some data.
799                         doBeginReadNow();
800                     }
801                 } finally {
802                     inReadComplete = false;
803                     socketIsEmpty = false;
804                 }
805             }
806         }
807 
808         /**
809          * Called once a read was completed.
810          */
811         protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
812 
813         /**
814          * Called once POLLRDHUP event is ready to be processed
815          */
816         private void pollRdHup(int res) {
817             ioState &= ~POLL_RDHUP_SCHEDULED;
818             pollRdhupId = 0;
819             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
820                 return;
821             }
822 
823             // Mark that we received a POLLRDHUP and so need to continue reading until all the input ist drained.
824             recvBufAllocHandle().rdHupReceived();
825 
826             if (isActive()) {
827                 scheduleFirstReadIfNeeded();
828             } else {
829                 // Just to be safe make sure the input marked as closed.
830                 shutdownInput(false);
831             }
832         }
833 
834         /**
835          * Called once POLLIN event is ready to be processed
836          */
837         private void pollIn(int res, int flags, short data) {
838             // Check if we need to rearm. This works for both cases, POLL_ADD and POLL_ADD_MULTI.
839             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
840             if (rearm) {
841                 ioState &= ~POLL_IN_SCHEDULED;
842                 pollInId = 0;
843             }
844             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
845                 return;
846             }
847             if (!readPending) {
848                 // We received the POLLIN but the user is not interested yet in reading, just mark socketHasMoreData
849                 // as true so we will trigger a read directly once the user calls read()
850                 socketHasMoreData = true;
851                 return;
852             }
853             scheduleFirstReadIfNeeded();
854         }
855 
856         private void scheduleFirstReadIfNeeded() {
857             if ((ioState & READ_SCHEDULED) == 0) {
858                 scheduleFirstRead();
859             }
860         }
861 
862         private void scheduleFirstRead() {
863             // This is a new "read loop" so we need to reset the allocHandle.
864             final ChannelConfig config = config();
865             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
866             allocHandle.reset(config);
867             scheduleRead(true);
868         }
869 
870         protected final void scheduleRead(boolean first) {
871             // Only schedule another read if the fd is still open.
872             if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
873                 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
874                 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
875                     ioState |= READ_SCHEDULED;
876                 }
877             }
878         }
879 
880         /**
881          * Schedule a read and returns the number of {@link #readComplete(byte, int, int, short)}
882          * calls that are expected because of the scheduled read.
883          *
884          * @param first             {@code true} if this is the first read of a read loop.
885          * @param socketIsEmpty     {@code true} if the socket is guaranteed to be empty, {@code false} otherwise.
886          * @return                  the number of {@link #readComplete(byte, int, int, short)} calls expected or
887          *                          {@code -1} if {@link #readComplete(byte, int, int, short)} is called until
888          *                          the read is cancelled (multi-shot).
889          */
890         protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
891 
892         /**
893          * Called once POLLOUT event is ready to be processed
894          *
895          * @param res   the result.
896          */
897         private void pollOut(int res) {
898             ioState &= ~POLL_OUT_SCHEDULED;
899             pollOutId = 0;
900             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
901                 return;
902             }
903             // pending connect
904             if (connectPromise != null) {
905                 // Note this method is invoked by the event loop only if the connection attempt was
906                 // neither cancelled nor timed out.
907 
908                 assert eventLoop().inEventLoop();
909 
910                 boolean connectStillInProgress = false;
911                 try {
912                     boolean wasActive = isActive();
913                     if (!socket.finishConnect()) {
914                         connectStillInProgress = true;
915                         return;
916                     }
917                     fulfillConnectPromise(connectPromise, wasActive);
918                 } catch (Throwable t) {
919                     fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
920                 } finally {
921                     if (!connectStillInProgress) {
922                         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0
923                         // is used
924                         // See https://github.com/netty/netty/issues/1770
925                         cancelConnectTimeoutFuture();
926                         connectPromise = null;
927                     } else {
928                         // The connect was not done yet, register for POLLOUT again
929                         schedulePollOut();
930                     }
931                 }
932             } else if (!socket.isOutputShutdown()) {
933                 // Try writing again
934                 super.flush0();
935             }
936         }
937 
938         /**
939          * Called once a write was completed.
940          *
941          * @param op    the op code.
942          * @param res   the result.
943          * @param flags the flags.
944          * @param data  the data that was passed when submitting the op.
945          */
946         private void writeComplete(byte op, int res, int flags, short data) {
947             if ((ioState & CONNECT_SCHEDULED) != 0) {
948                 // The writeComplete(...) callback was called because of a sendmsg(...) result that was used for
949                 // TCP_FASTOPEN_CONNECT.
950                 freeMsgHdrArray();
951                 if (res > 0) {
952                     // Connect complete!
953                     outboundBuffer().removeBytes(res);
954 
955                     // Explicit pass in 0 as this is returned by a connect(...) call when it was successful.
956                     connectComplete(op, 0, flags, data);
957                 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
958                     // This happens when we (as a client) have no pre-existing cookie for doing a fast-open connection.
959                     // In this case, our TCP connection will be established normally, but no data was transmitted at
960                     // this time. We'll just transmit the data with normal writes later.
961                     // Let's submit a normal connect.
962                     submitConnect((InetSocketAddress) requestedRemoteAddress);
963                 } else {
964                     // There was an error, handle it as a normal connect error.
965                     connectComplete(op, res, flags, data);
966                 }
967                 return;
968             }
969 
970             if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
971                 assert numOutstandingWrites > 0;
972                 --numOutstandingWrites;
973             }
974 
975             boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
976             if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
977 
978                 // We were not able to write everything, let's register for POLLOUT
979                 schedulePollOut();
980             }
981 
982             // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
983             // while still removing messages internally in removeBytes(...) which then may corrupt state.
984             if (numOutstandingWrites == 0) {
985                 ioState &= ~WRITE_SCHEDULED;
986 
987                 // If we could write all and we did not schedule a pollout yet let us try to write again
988                 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
989                     scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
990                 }
991             }
992         }
993 
994         /**
995          * Called once a write was completed.
996          * @param op            the op code
997          * @param res           the result.
998          * @param flags         the flags.
999          * @param data          the data that was passed when submitting the op.
1000          * @param outstanding   the outstanding write completions.
1001          */
1002         abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1003 
1004         /**
1005          * Called once a cancel was completed.
1006          *
1007          * @param op            the op code
1008          * @param res           the result.
1009          * @param flags         the flags.
1010          * @param data          the data that was passed when submitting the op.
1011          */
1012         void cancelComplete0(byte op, int res, int flags, short data) {
1013             // NOOP
1014         }
1015 
1016         /**
1017          * Called once a connect was completed.
1018          * @param op            the op code.
1019          * @param res           the result.
1020          * @param flags         the flags.
1021          * @param data          the data that was passed when submitting the op.
1022          */
1023         void connectComplete(byte op, int res, int flags, short data) {
1024             ioState &= ~CONNECT_SCHEDULED;
1025             freeRemoteAddressMemory();
1026 
1027             if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1028                 // connect not complete yet need to wait for poll_out event
1029                 schedulePollOut();
1030             } else {
1031                 try {
1032                     if (res == 0) {
1033                         fulfillConnectPromise(connectPromise, active);
1034                         if (readPending) {
1035                             doBeginReadNow();
1036                         }
1037                     } else {
1038                         try {
1039                             Errors.throwConnectException("io_uring connect", res);
1040                         } catch (Throwable cause) {
1041                             fulfillConnectPromise(connectPromise, cause);
1042                         }
1043                     }
1044                 } finally {
1045                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is
1046                     // used
1047                     // See https://github.com/netty/netty/issues/1770
1048                     cancelConnectTimeoutFuture();
1049                     connectPromise = null;
1050                 }
1051             }
1052         }
1053 
1054         @Override
1055         public void connect(
1056                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1057             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
1058             // non-blocking io.
1059             if (promise.isDone() || !ensureOpen(promise)) {
1060                 return;
1061             }
1062 
1063             if (delayedClose != null) {
1064                 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1065                 return;
1066             }
1067             try {
1068                 if (connectPromise != null) {
1069                     throw new ConnectionPendingException();
1070                 }
1071                 if (localAddress instanceof InetSocketAddress) {
1072                     checkResolvable((InetSocketAddress) localAddress);
1073                 }
1074 
1075                 if (remoteAddress instanceof InetSocketAddress) {
1076                     checkResolvable((InetSocketAddress) remoteAddress);
1077                 }
1078 
1079                 if (remote != null) {
1080                     // Check if already connected before trying to connect. This is needed as connect(...) will not#
1081                     // return -1 and set errno to EISCONN if a previous connect(...) attempt was setting errno to
1082                     // EINPROGRESS and finished later.
1083                     throw new AlreadyConnectedException();
1084                 }
1085 
1086                 if (localAddress != null) {
1087                     socket.bind(localAddress);
1088                 }
1089 
1090                 if (remoteAddress instanceof InetSocketAddress) {
1091                     InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1092                     ByteBuf initialData = null;
1093                     if (IoUring.isTcpFastOpenClientSideAvailable() &&
1094                         config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1095                         ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1096                         outbound.addFlush();
1097                         Object curr;
1098                         if ((curr = outbound.current()) instanceof ByteBuf) {
1099                             initialData = (ByteBuf) curr;
1100                         }
1101                     }
1102                     if (initialData != null) {
1103                         msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1104                         MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1105                         hdr.set(socket, inetSocketAddress, IoUring.memoryAddress(initialData),
1106                                 initialData.readableBytes(), (short) 0);
1107 
1108                         int fd = fd().intValue();
1109                         IoRegistration registration = registration();
1110                         IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
1111                                 hdr.address(), hdr.idx());
1112                         connectId = registration.submit(ops);
1113                         if (connectId == 0) {
1114                             // Directly release the memory if submitting failed.
1115                             freeMsgHdrArray();
1116                         }
1117                     } else {
1118                         submitConnect(inetSocketAddress);
1119                     }
1120                 } else if (remoteAddress instanceof DomainSocketAddress) {
1121                     DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) remoteAddress;
1122                     submitConnect(unixDomainSocketAddress);
1123                 } else {
1124                     throw new Error("Unexpected SocketAddress implementation " + className(remoteAddress));
1125                 }
1126 
1127                 if (connectId != 0) {
1128                     ioState |= CONNECT_SCHEDULED;
1129                 }
1130             } catch (Throwable t) {
1131                 closeIfClosed();
1132                 promise.tryFailure(annotateConnectException(t, remoteAddress));
1133                 return;
1134             }
1135             connectPromise = promise;
1136             requestedRemoteAddress = remoteAddress;
1137             // Schedule connect timeout.
1138             int connectTimeoutMillis = config().getConnectTimeoutMillis();
1139             if (connectTimeoutMillis > 0) {
1140                 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1141                     @Override
1142                     public void run() {
1143                         ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1144                         if (connectPromise != null && !connectPromise.isDone() &&
1145                                 connectPromise.tryFailure(new ConnectTimeoutException(
1146                                         "connection timed out: " + remoteAddress))) {
1147                             close(voidPromise());
1148                         }
1149                     }
1150                 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1151             }
1152 
1153             promise.addListener(new ChannelFutureListener() {
1154                 @Override
1155                 public void operationComplete(ChannelFuture future) {
1156                     // If the connect future is cancelled we also cancel the timeout and close the
1157                     // underlying socket.
1158                     if (future.isCancelled()) {
1159                         cancelConnectTimeoutFuture();
1160                         connectPromise = null;
1161                         close(voidPromise());
1162                     }
1163                 }
1164             });
1165         }
1166     }
1167 
1168     private void submitConnect(InetSocketAddress inetSocketAddress) {
1169         cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1170         remoteAddressMemory = cleanable.buffer();
1171 
1172         SockaddrIn.set(socket.isIpv6(), remoteAddressMemory, inetSocketAddress);
1173 
1174         int fd = fd().intValue();
1175         IoRegistration registration = registration();
1176         IoUringIoOps ops = IoUringIoOps.newConnect(
1177                 fd, (byte) 0, Buffer.memoryAddress(remoteAddressMemory), nextOpsId());
1178         connectId = registration.submit(ops);
1179         if (connectId == 0) {
1180             // Directly release the memory if submitting failed.
1181             freeRemoteAddressMemory();
1182         }
1183     }
1184 
1185     private void submitConnect(DomainSocketAddress unixDomainSocketAddress) {
1186         cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_UN);
1187         remoteAddressMemory = cleanable.buffer();
1188         SockaddrIn.setUds(remoteAddressMemory, unixDomainSocketAddress);
1189         int fd = fd().intValue();
1190         IoRegistration registration = registration();
1191         long addr = Buffer.memoryAddress(remoteAddressMemory);
1192         IoUringIoOps ops = IoUringIoOps.newConnect(fd, (byte) 0, addr, Native.SIZEOF_SOCKADDR_UN, nextOpsId());
1193         connectId = registration.submit(ops);
1194         if (connectId == 0) {
1195             // Directly release the memory if submitting failed.
1196             freeRemoteAddressMemory();
1197         }
1198     }
1199 
1200     @Override
1201     protected Object filterOutboundMessage(Object msg) {
1202         if (msg instanceof ByteBuf) {
1203             ByteBuf buf = (ByteBuf) msg;
1204             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1205         }
1206         throw new UnsupportedOperationException("unsupported message type");
1207     }
1208 
1209     @Override
1210     protected void doRegister(ChannelPromise promise) {
1211         IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1212         eventLoop.register(ioUringUnsafe()).addListener(f -> {
1213             if (f.isSuccess()) {
1214                 registration = (IoRegistration) f.getNow();
1215                 promise.setSuccess();
1216             } else {
1217                 promise.setFailure(f.cause());
1218             }
1219         });
1220     }
1221 
1222     @Override
1223     protected final void doDeregister() {
1224         // Cancel all previous submitted ops.
1225         ioUringUnsafe().cancelOps(connectPromise != null);
1226     }
1227 
1228     @Override
1229     protected void doBind(final SocketAddress local) throws Exception {
1230         if (local instanceof InetSocketAddress) {
1231             checkResolvable((InetSocketAddress) local);
1232         }
1233         socket.bind(local);
1234         this.local = socket.localAddress();
1235     }
1236 
1237     protected static void checkResolvable(InetSocketAddress addr) {
1238         if (addr.isUnresolved()) {
1239             throw new UnresolvedAddressException();
1240         }
1241     }
1242 
1243     @Override
1244     protected final SocketAddress localAddress0() {
1245         return local;
1246     }
1247 
1248     @Override
1249     protected final SocketAddress remoteAddress0() {
1250         return remote;
1251     }
1252 
1253     private static boolean isAllowHalfClosure(ChannelConfig config) {
1254         return config instanceof SocketChannelConfig &&
1255                ((SocketChannelConfig) config).isAllowHalfClosure();
1256     }
1257 
1258     private void cancelConnectTimeoutFuture() {
1259         if (connectTimeoutFuture != null) {
1260             connectTimeoutFuture.cancel(false);
1261             connectTimeoutFuture = null;
1262         }
1263     }
1264 
1265     private void computeRemote() {
1266         if (requestedRemoteAddress instanceof InetSocketAddress) {
1267             remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1268         }
1269     }
1270 
1271     private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1272         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1273     }
1274 
1275     /**
1276      * Return if the socket is guaranteed to be empty when the submitted io was executed and the completion event be
1277      * created.
1278      * @param flags     the flags that were part of the completion
1279      * @return          {@code true} if empty.
1280      */
1281     protected abstract boolean socketIsEmpty(int flags);
1282 
1283     abstract boolean isPollInFirst();
1284 }