1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufHolder;
21 import io.netty.buffer.ByteBufUtil;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractChannel;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelOption;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.ServerChannel;
38 import io.netty.channel.socket.ChannelInputShutdownEvent;
39 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40 import io.netty.channel.socket.SocketChannelConfig;
41 import io.netty.channel.unix.Buffer;
42 import io.netty.channel.unix.DomainSocketAddress;
43 import io.netty.channel.unix.Errors;
44 import io.netty.channel.unix.FileDescriptor;
45 import io.netty.channel.unix.UnixChannel;
46 import io.netty.channel.unix.UnixChannelUtil;
47 import io.netty.util.ReferenceCountUtil;
48 import io.netty.util.concurrent.PromiseNotifier;
49 import io.netty.util.internal.CleanableDirectBuffer;
50 import io.netty.util.internal.StringUtil;
51 import io.netty.util.internal.logging.InternalLogger;
52 import io.netty.util.internal.logging.InternalLoggerFactory;
53
54 import java.io.IOException;
55 import java.net.InetSocketAddress;
56 import java.net.SocketAddress;
57 import java.nio.ByteBuffer;
58 import java.nio.channels.AlreadyConnectedException;
59 import java.nio.channels.ClosedChannelException;
60 import java.nio.channels.ConnectionPendingException;
61 import java.nio.channels.NotYetConnectedException;
62 import java.nio.channels.UnresolvedAddressException;
63 import java.util.concurrent.ScheduledFuture;
64 import java.util.concurrent.TimeUnit;
65
66 import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
67 import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
68 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
69 import static io.netty.util.internal.ObjectUtil.checkNotNull;
70 import static io.netty.util.internal.StringUtil.className;
71
72
73 abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
74 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
75 final LinuxSocket socket;
76 protected volatile boolean active;
77
78
79 private static final int POLL_IN_SCHEDULED = 1;
80 private static final int POLL_OUT_SCHEDULED = 1 << 2;
81 private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
82 private static final int WRITE_SCHEDULED = 1 << 4;
83 private static final int READ_SCHEDULED = 1 << 5;
84 private static final int CONNECT_SCHEDULED = 1 << 6;
85
86 private short opsId = Short.MIN_VALUE;
87
88 private long pollInId;
89 private long pollOutId;
90 private long pollRdhupId;
91 private long connectId;
92
93
94 private byte ioState;
95
96
97
98
99 private short numOutstandingWrites;
100
101 private short numOutstandingReads;
102
103 private boolean readPending;
104 private boolean inReadComplete;
105 private boolean socketHasMoreData;
106
107 private static final class DelayedClose {
108 private final ChannelPromise promise;
109 private final Throwable cause;
110 private final ClosedChannelException closeCause;
111
112 DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
113 this.promise = promise;
114 this.cause = cause;
115 this.closeCause = closeCause;
116 }
117 }
118 private DelayedClose delayedClose;
119 private boolean inputClosedSeenErrorOnRead;
120
121
122
123
124 private ChannelPromise connectPromise;
125 private ScheduledFuture<?> connectTimeoutFuture;
126 private SocketAddress requestedRemoteAddress;
127 private CleanableDirectBuffer cleanable;
128 private ByteBuffer remoteAddressMemory;
129 private MsgHdrMemoryArray msgHdrMemoryArray;
130
131 private IoRegistration registration;
132
133 private volatile SocketAddress local;
134 private volatile SocketAddress remote;
135
136 AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
137 super(parent);
138 this.socket = checkNotNull(socket, "fd");
139
140 if (active) {
141
142
143 this.active = true;
144 this.local = socket.localAddress();
145 this.remote = socket.remoteAddress();
146 }
147
148 logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
149 }
150
151 AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
152 super(parent);
153 this.socket = checkNotNull(fd, "fd");
154 this.active = true;
155
156
157
158 this.remote = remote;
159 this.local = fd.localAddress();
160 }
161
162
163 final void autoReadCleared() {
164 if (!isRegistered()) {
165 return;
166 }
167 IoRegistration registration = this.registration;
168 if (registration == null || !registration.isValid()) {
169 return;
170 }
171 if (eventLoop().inEventLoop()) {
172 clearRead();
173 } else {
174 eventLoop().execute(this::clearRead);
175 }
176 }
177
178 private void clearRead() {
179 assert eventLoop().inEventLoop();
180 readPending = false;
181 IoRegistration registration = this.registration;
182 if (registration == null || !registration.isValid()) {
183 return;
184 }
185
186 cancelOutstandingReads(registration(), numOutstandingReads);
187 }
188
189
190
191
192
193
194 protected final short nextOpsId() {
195 short id = opsId++;
196
197
198 if (id == 0) {
199 id = opsId++;
200 }
201 return id;
202 }
203
204 public final boolean isOpen() {
205 return socket.isOpen();
206 }
207
208 @Override
209 public boolean isActive() {
210 return active;
211 }
212
213 @Override
214 public final FileDescriptor fd() {
215 return socket;
216 }
217
218 private AbstractUringUnsafe ioUringUnsafe() {
219 return (AbstractUringUnsafe) unsafe();
220 }
221
222 @Override
223 protected boolean isCompatible(final EventLoop loop) {
224 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
225 }
226
227 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
228 return newDirectBuffer(buf, buf);
229 }
230
231 protected boolean allowMultiShotPollIn() {
232 return IoUring.isPollAddMultishotEnabled();
233 }
234
235 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
236 final int readableBytes = buf.readableBytes();
237 if (readableBytes == 0) {
238 ReferenceCountUtil.release(holder);
239 return Unpooled.EMPTY_BUFFER;
240 }
241
242 final ByteBufAllocator alloc = alloc();
243 if (alloc.isDirectBufferPooled()) {
244 return newDirectBuffer0(holder, buf, alloc, readableBytes);
245 }
246
247 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
248 if (directBuf == null) {
249 return newDirectBuffer0(holder, buf, alloc, readableBytes);
250 }
251
252 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
253 ReferenceCountUtil.safeRelease(holder);
254 return directBuf;
255 }
256
257 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
258 final ByteBuf directBuf = alloc.directBuffer(capacity);
259 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
260 ReferenceCountUtil.safeRelease(holder);
261 return directBuf;
262 }
263
264
265
266
267
268
269
270 protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
271
272
273
274
275
276
277
278 protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
279
280 @Override
281 protected void doDisconnect() throws Exception {
282 }
283
284 private void freeRemoteAddressMemory() {
285 if (remoteAddressMemory != null) {
286 cleanable.clean();
287 cleanable = null;
288 remoteAddressMemory = null;
289 }
290 }
291
292 private void freeMsgHdrArray() {
293 if (msgHdrMemoryArray != null) {
294 msgHdrMemoryArray.release();
295 msgHdrMemoryArray = null;
296 }
297 }
298
299 @Override
300 protected void doClose() throws Exception {
301 active = false;
302
303 if (registration != null) {
304 if (socket.markClosed()) {
305 int fd = fd().intValue();
306 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
307 registration.submit(ops);
308 }
309 } else {
310
311 socket.close();
312 ioUringUnsafe().unregistered();
313 }
314 }
315
316 @Override
317 protected final void doBeginRead() {
318 if (inputClosedSeenErrorOnRead) {
319
320 return;
321 }
322 if (readPending) {
323
324 return;
325 }
326 readPending = true;
327 if (inReadComplete || !isActive()) {
328
329
330
331 return;
332 }
333 doBeginReadNow();
334 }
335
336 private void doBeginReadNow() {
337 if (inputClosedSeenErrorOnRead) {
338
339 return;
340 }
341 if (!isPollInFirst() ||
342
343
344 socketHasMoreData) {
345
346 ioUringUnsafe().scheduleFirstReadIfNeeded();
347 } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
348 ioUringUnsafe().schedulePollIn();
349 }
350 }
351
352 @Override
353 protected void doWrite(ChannelOutboundBuffer in) {
354 scheduleWriteIfNeeded(in, true);
355 }
356
357 protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
358 if ((ioState & WRITE_SCHEDULED) != 0) {
359 return;
360 }
361 if (scheduleWrite(in) > 0) {
362 ioState |= WRITE_SCHEDULED;
363 if (submitAndRunNow && !isWritable()) {
364 submitAndRunNow();
365 }
366 }
367 }
368
369 protected void submitAndRunNow() {
370
371 }
372
373 private int scheduleWrite(ChannelOutboundBuffer in) {
374 if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
375 return 0;
376 }
377 if (in == null) {
378 return 0;
379 }
380
381 int msgCount = in.size();
382 if (msgCount == 0) {
383 return 0;
384 }
385 Object msg = in.current();
386
387 if (msgCount > 1 && in.current() instanceof ByteBuf) {
388 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
389 } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
390 (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
391
392 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
393 } else {
394 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
395 }
396
397 assert numOutstandingWrites > 0;
398 return numOutstandingWrites;
399 }
400
401 protected final IoRegistration registration() {
402 assert registration != null;
403 return registration;
404 }
405
406 private void schedulePollOut() {
407 pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
408 }
409
410 final void schedulePollRdHup() {
411 pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
412 }
413
414 protected abstract boolean isStreamSocket();
415
416 private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
417 assert (ioState & ioMask) == 0;
418 int fd = fd().intValue();
419 IoRegistration registration = registration();
420 IoUringIoOps ops = IoUringIoOps.newPollAdd(
421 fd, (byte) 0, mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
422 long id = registration.submit(ops);
423 if (id != 0) {
424 ioState |= (byte) ioMask;
425 }
426 return id;
427 }
428
429 final void resetCachedAddresses() {
430 local = socket.localAddress();
431 remote = socket.remoteAddress();
432 }
433
434 protected abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
435 private IoUringRecvByteAllocatorHandle allocHandle;
436 private boolean closed;
437 private boolean socketIsEmpty;
438 private ChannelPromise deregisterPromise;
439
440
441
442
443
444 protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
445
446
447
448
449
450 protected abstract int scheduleWriteSingle(Object msg);
451
452 @Override
453 public final void handle(IoRegistration registration, IoEvent ioEvent) {
454 IoUringIoEvent event = (IoUringIoEvent) ioEvent;
455 byte op = event.opcode();
456 int res = event.res();
457 int flags = event.flags();
458 short data = event.data();
459 switch (op) {
460 case Native.IORING_OP_RECV:
461 case Native.IORING_OP_ACCEPT:
462 case Native.IORING_OP_RECVMSG:
463 case Native.IORING_OP_READ:
464 readComplete(op, res, flags, data);
465 break;
466 case Native.IORING_OP_WRITEV:
467 case Native.IORING_OP_SEND:
468 case Native.IORING_OP_SENDMSG:
469 case Native.IORING_OP_WRITE:
470 case Native.IORING_OP_SPLICE:
471 case Native.IORING_OP_SEND_ZC:
472 case Native.IORING_OP_SENDMSG_ZC:
473 writeComplete(op, res, flags, data);
474 break;
475 case Native.IORING_OP_POLL_ADD:
476 pollAddComplete(res, flags, data);
477 break;
478 case Native.IORING_OP_ASYNC_CANCEL:
479 cancelComplete0(op, res, flags, data);
480 break;
481 case Native.IORING_OP_CONNECT:
482 connectComplete(op, res, flags, data);
483
484
485 freeMsgHdrArray();
486 freeRemoteAddressMemory();
487 break;
488 case Native.IORING_OP_CLOSE:
489 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
490 if (delayedClose != null) {
491 delayedClose.promise.setSuccess();
492 }
493 closed = true;
494 }
495 break;
496 default:
497 break;
498 }
499
500
501
502 handleDelayedClosed();
503
504 if (ioState == 0 && (closed || !isRegistered())) {
505
506 registration.cancel();
507 }
508 }
509
510 @Override
511 public void unregistered() {
512 freeMsgHdrArray();
513 freeRemoteAddressMemory();
514
515
516 if (deregisterPromise != null) {
517 ChannelPromise promise = deregisterPromise;
518 deregisterPromise = null;
519 promise.setSuccess();
520 }
521 }
522
523 private void handleDelayedClosed() {
524 if (delayedClose != null && canCloseNow()) {
525 closeNow();
526 }
527 }
528
529 private void pollAddComplete(int res, int flags, short data) {
530 if ((res & Native.POLLOUT) != 0) {
531 pollOut(res);
532 }
533 if ((res & Native.POLLIN) != 0) {
534 pollIn(res, flags, data);
535 }
536 if ((res & Native.POLLRDHUP) != 0) {
537 pollRdHup(res);
538 }
539 }
540
541 @Override
542 public final void close() throws Exception {
543 close(voidPromise());
544 }
545
546 @Override
547 protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
548 if (closeFuture().isDone()) {
549
550 safeSetSuccess(promise);
551 return;
552 }
553 if (delayedClose == null) {
554
555
556
557 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
558 } else {
559 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
560 return;
561 }
562
563 boolean cancelConnect = false;
564 try {
565 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
566 if (connectPromise != null) {
567
568 connectPromise.tryFailure(new ClosedChannelException());
569 AbstractIoUringChannel.this.connectPromise = null;
570 cancelConnect = true;
571 }
572
573 cancelConnectTimeoutFuture();
574 } finally {
575
576
577 cancelOps(cancelConnect);
578 }
579
580 if (canCloseNow()) {
581
582 closeNow();
583 }
584 }
585
586 private boolean cancelOps(boolean cancelConnect) {
587 if (registration == null || !registration.isValid()) {
588 return false;
589 }
590 boolean cancelled = false;
591 byte flags = (byte) 0;
592 if ((ioState & POLL_RDHUP_SCHEDULED) != 0 && pollRdhupId != 0) {
593 long id = registration.submit(
594 IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
595 assert id != 0;
596 pollRdhupId = 0;
597 cancelled = true;
598 }
599 if ((ioState & POLL_IN_SCHEDULED) != 0 && pollInId != 0) {
600 long id = registration.submit(
601 IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
602 assert id != 0;
603 pollInId = 0;
604 cancelled = true;
605 }
606 if ((ioState & POLL_OUT_SCHEDULED) != 0 && pollOutId != 0) {
607 long id = registration.submit(
608 IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
609 assert id != 0;
610 pollOutId = 0;
611 cancelled = true;
612 }
613 if (cancelConnect && connectId != 0) {
614
615 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
616 assert id != 0;
617 connectId = 0;
618 cancelled = true;
619 }
620 if (numOutstandingReads != 0 || numOutstandingWrites != 0) {
621 cancelled = true;
622 }
623 cancelOutstandingReads(registration, numOutstandingReads);
624 cancelOutstandingWrites(registration, numOutstandingWrites);
625 return cancelled;
626 }
627
628 private boolean canCloseNow() {
629
630
631 return canCloseNow0() && (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
632 }
633
634 protected boolean canCloseNow0() {
635 return true;
636 }
637
638 private void closeNow() {
639 super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
640 }
641
642 @Override
643 protected final void flush0() {
644
645
646
647 if ((ioState & POLL_OUT_SCHEDULED) == 0) {
648 super.flush0();
649 }
650 }
651
652 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
653 if (promise == null) {
654
655 return;
656 }
657
658
659 promise.tryFailure(cause);
660 closeIfClosed();
661 }
662
663 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
664 if (promise == null) {
665
666 return;
667 }
668 active = true;
669
670 if (local == null) {
671 local = socket.localAddress();
672 }
673 computeRemote();
674
675 if (isStreamSocket()) {
676
677 schedulePollRdHup();
678 }
679
680
681
682 boolean active = isActive();
683
684
685 boolean promiseSet = promise.trySuccess();
686
687
688
689 if (!wasActive && active) {
690 pipeline().fireChannelActive();
691 }
692
693
694 if (!promiseSet) {
695 close(voidPromise());
696 }
697 }
698
699 @Override
700 public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
701 if (allocHandle == null) {
702 allocHandle = new IoUringRecvByteAllocatorHandle(
703 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
704 }
705 return allocHandle;
706 }
707
708 final void shutdownInput(boolean allDataRead) {
709 logger.trace("shutdownInput Fd: {}", fd().intValue());
710 if (!socket.isInputShutdown()) {
711 if (isAllowHalfClosure(config())) {
712 try {
713 socket.shutdown(true, false);
714 } catch (IOException ignored) {
715
716
717 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
718 return;
719 } catch (NotYetConnectedException ignore) {
720
721
722 }
723 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
724 } else {
725
726 inputClosedSeenErrorOnRead = true;
727 close(voidPromise());
728 return;
729 }
730 }
731 if (allDataRead && !inputClosedSeenErrorOnRead) {
732 inputClosedSeenErrorOnRead = true;
733 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
734 }
735 }
736
737 private void fireEventAndClose(Object evt) {
738 pipeline().fireUserEventTriggered(evt);
739 close(voidPromise());
740 }
741
742 final void schedulePollIn() {
743 assert (ioState & POLL_IN_SCHEDULED) == 0;
744 if (!isActive() || shouldBreakIoUringInReady(config())) {
745 return;
746 }
747 pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, allowMultiShotPollIn());
748 }
749
750 private void readComplete(byte op, int res, int flags, short data) {
751 assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
752
753 boolean multishot = numOutstandingReads == -1;
754 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
755 if (rearm) {
756
757
758 ioState &= ~READ_SCHEDULED;
759 }
760 boolean pending = readPending;
761 if (multishot) {
762
763
764 readPending = false;
765 } else if (--numOutstandingReads == 0) {
766
767 readPending = false;
768 ioState &= ~READ_SCHEDULED;
769 }
770 inReadComplete = true;
771 try {
772 socketIsEmpty = socketIsEmpty(flags);
773 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
774 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
775 readComplete0(op, res, flags, data, numOutstandingReads);
776 } finally {
777 try {
778
779 if (recvBufAllocHandle().isReadComplete()) {
780
781 recvBufAllocHandle().reset(config());
782
783
784 if (!multishot) {
785 if (readPending) {
786
787
788 doBeginReadNow();
789 }
790 } else {
791
792
793
794 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
795
796
797
798
799 if (pending) {
800 doBeginReadNow();
801 }
802 } else if (rearm) {
803
804 doBeginReadNow();
805 } else if (!readPending) {
806
807
808 cancelOutstandingReads(registration, numOutstandingReads);
809 }
810 }
811 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
812
813
814
815
816 if (pending) {
817 doBeginReadNow();
818 }
819 } else if (multishot && rearm) {
820
821 doBeginReadNow();
822 }
823 } finally {
824 inReadComplete = false;
825 socketIsEmpty = false;
826 }
827 }
828 }
829
830
831
832
833 protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
834
835
836
837
838 private void pollRdHup(int res) {
839 ioState &= ~POLL_RDHUP_SCHEDULED;
840 pollRdhupId = 0;
841 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
842 return;
843 }
844
845
846 recvBufAllocHandle().rdHupReceived();
847
848 if (isActive()) {
849 scheduleFirstReadIfNeeded();
850 } else {
851
852 shutdownInput(false);
853 }
854 }
855
856
857
858
859 private void pollIn(int res, int flags, short data) {
860
861 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
862 if (rearm) {
863 ioState &= ~POLL_IN_SCHEDULED;
864 pollInId = 0;
865 }
866 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
867 return;
868 }
869 if (!readPending) {
870
871
872 socketHasMoreData = true;
873 return;
874 }
875 scheduleFirstReadIfNeeded();
876 }
877
878 private void scheduleFirstReadIfNeeded() {
879 if ((ioState & READ_SCHEDULED) == 0) {
880 scheduleFirstRead();
881 }
882 }
883
884 private void scheduleFirstRead() {
885
886 final ChannelConfig config = config();
887 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
888 allocHandle.reset(config);
889 scheduleRead(true);
890 }
891
892 protected final void scheduleRead(boolean first) {
893
894 if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
895 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
896 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
897 ioState |= READ_SCHEDULED;
898 }
899 }
900 }
901
902
903
904
905
906
907
908
909
910
911
912 protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
913
914
915
916
917
918
919 private void pollOut(int res) {
920 ioState &= ~POLL_OUT_SCHEDULED;
921 pollOutId = 0;
922 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
923 return;
924 }
925
926 if (connectPromise != null) {
927
928
929
930 assert eventLoop().inEventLoop();
931
932 boolean connectStillInProgress = false;
933 try {
934 boolean wasActive = isActive();
935 if (!socket.finishConnect()) {
936 connectStillInProgress = true;
937 return;
938 }
939 fulfillConnectPromise(connectPromise, wasActive);
940 } catch (Throwable t) {
941 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
942 } finally {
943 if (!connectStillInProgress) {
944
945
946
947 cancelConnectTimeoutFuture();
948 connectPromise = null;
949 } else {
950
951 schedulePollOut();
952 }
953 }
954 } else if (!socket.isOutputShutdown()) {
955
956 super.flush0();
957 }
958 }
959
960
961
962
963
964
965
966
967
968 private void writeComplete(byte op, int res, int flags, short data) {
969 if ((ioState & CONNECT_SCHEDULED) != 0) {
970
971
972 freeMsgHdrArray();
973 if (res > 0) {
974
975 outboundBuffer().removeBytes(res);
976
977
978 connectComplete(op, 0, flags, data);
979 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
980
981
982
983
984 submitConnect((InetSocketAddress) requestedRemoteAddress);
985 } else {
986
987 connectComplete(op, res, flags, data);
988 }
989 return;
990 }
991
992 if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
993 assert numOutstandingWrites > 0;
994 --numOutstandingWrites;
995 }
996
997 boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
998 if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
999
1000
1001 schedulePollOut();
1002 }
1003
1004
1005
1006 if (numOutstandingWrites == 0) {
1007 ioState &= ~WRITE_SCHEDULED;
1008
1009
1010 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
1011 scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
1012 }
1013 }
1014 }
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024 abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034 void cancelComplete0(byte op, int res, int flags, short data) {
1035
1036 }
1037
1038
1039
1040
1041
1042
1043
1044
1045 void connectComplete(byte op, int res, int flags, short data) {
1046 ioState &= ~CONNECT_SCHEDULED;
1047 freeRemoteAddressMemory();
1048
1049 if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1050
1051 schedulePollOut();
1052 } else {
1053 try {
1054 if (res == 0) {
1055 fulfillConnectPromise(connectPromise, active);
1056 if (readPending) {
1057 doBeginReadNow();
1058 }
1059 } else {
1060 try {
1061 Errors.throwConnectException("io_uring connect", res);
1062 } catch (Throwable cause) {
1063 fulfillConnectPromise(connectPromise, cause);
1064 }
1065 }
1066 } finally {
1067
1068
1069
1070 cancelConnectTimeoutFuture();
1071 connectPromise = null;
1072 }
1073 }
1074 }
1075
1076 @Override
1077 public void connect(
1078 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1079
1080
1081 if (promise.isDone() || !ensureOpen(promise)) {
1082 return;
1083 }
1084
1085 if (delayedClose != null) {
1086 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1087 return;
1088 }
1089 try {
1090 if (connectPromise != null) {
1091 throw new ConnectionPendingException();
1092 }
1093 if (localAddress instanceof InetSocketAddress) {
1094 checkResolvable((InetSocketAddress) localAddress);
1095 }
1096
1097 if (remoteAddress instanceof InetSocketAddress) {
1098 checkResolvable((InetSocketAddress) remoteAddress);
1099 }
1100
1101 if (remote != null) {
1102
1103
1104
1105 throw new AlreadyConnectedException();
1106 }
1107
1108 if (localAddress != null) {
1109 socket.bind(localAddress);
1110 }
1111
1112 if (remoteAddress instanceof InetSocketAddress) {
1113 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1114 ByteBuf initialData = null;
1115 if (IoUring.isTcpFastOpenClientSideAvailable() &&
1116 config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1117 ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1118 outbound.addFlush();
1119 Object curr;
1120 if ((curr = outbound.current()) instanceof ByteBuf) {
1121 initialData = (ByteBuf) curr;
1122 }
1123 }
1124 if (initialData != null) {
1125 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1126 MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1127 hdr.set(socket, inetSocketAddress, IoUring.memoryAddress(initialData),
1128 initialData.readableBytes(), (short) 0);
1129
1130 int fd = fd().intValue();
1131 IoRegistration registration = registration();
1132 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
1133 hdr.address(), hdr.idx());
1134 connectId = registration.submit(ops);
1135 if (connectId == 0) {
1136
1137 freeMsgHdrArray();
1138 }
1139 } else {
1140 submitConnect(inetSocketAddress);
1141 }
1142 } else if (remoteAddress instanceof DomainSocketAddress) {
1143 DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) remoteAddress;
1144 submitConnect(unixDomainSocketAddress);
1145 } else {
1146 throw new Error("Unexpected SocketAddress implementation " + className(remoteAddress));
1147 }
1148
1149 if (connectId != 0) {
1150 ioState |= CONNECT_SCHEDULED;
1151 }
1152 } catch (Throwable t) {
1153 closeIfClosed();
1154 promise.tryFailure(annotateConnectException(t, remoteAddress));
1155 return;
1156 }
1157 connectPromise = promise;
1158 requestedRemoteAddress = remoteAddress;
1159
1160 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1161 if (connectTimeoutMillis > 0) {
1162 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1163 @Override
1164 public void run() {
1165 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1166 if (connectPromise != null && !connectPromise.isDone() &&
1167 connectPromise.tryFailure(new ConnectTimeoutException(
1168 "connection timed out: " + remoteAddress))) {
1169 close(voidPromise());
1170 }
1171 }
1172 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1173 }
1174
1175 promise.addListener(new ChannelFutureListener() {
1176 @Override
1177 public void operationComplete(ChannelFuture future) {
1178
1179
1180 if (future.isCancelled()) {
1181 cancelConnectTimeoutFuture();
1182 connectPromise = null;
1183 close(voidPromise());
1184 }
1185 }
1186 });
1187 }
1188
1189 @Override
1190 public final void deregister(ChannelPromise promise) {
1191 if (deregisterPromise != null) {
1192
1193 PromiseNotifier.cascade(deregisterPromise, promise);
1194 } else if (!isRegistered()) {
1195 promise.setSuccess();
1196 } else {
1197
1198
1199 deregisterPromise = promise;
1200 super.deregister(newPromise().addListener(f -> {
1201 if (!f.isSuccess()) {
1202 this.deregisterPromise = null;
1203 promise.setFailure(f.cause());
1204 }
1205 }));
1206 }
1207 }
1208 }
1209
1210 private void submitConnect(InetSocketAddress inetSocketAddress) {
1211 cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1212 remoteAddressMemory = cleanable.buffer();
1213
1214 SockaddrIn.set(socket.isIpv6(), remoteAddressMemory, inetSocketAddress);
1215
1216 int fd = fd().intValue();
1217 IoRegistration registration = registration();
1218 IoUringIoOps ops = IoUringIoOps.newConnect(
1219 fd, (byte) 0, Buffer.memoryAddress(remoteAddressMemory), nextOpsId());
1220 connectId = registration.submit(ops);
1221 if (connectId == 0) {
1222
1223 freeRemoteAddressMemory();
1224 }
1225 }
1226
1227 private void submitConnect(DomainSocketAddress unixDomainSocketAddress) {
1228 cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_UN);
1229 remoteAddressMemory = cleanable.buffer();
1230 int addrLen = SockaddrIn.setUds(remoteAddressMemory, unixDomainSocketAddress);
1231 int fd = fd().intValue();
1232 IoRegistration registration = registration();
1233 long addr = Buffer.memoryAddress(remoteAddressMemory);
1234 IoUringIoOps ops = IoUringIoOps.newConnect(fd, (byte) 0, addr, addrLen, nextOpsId());
1235 connectId = registration.submit(ops);
1236 if (connectId == 0) {
1237
1238 freeRemoteAddressMemory();
1239 }
1240 }
1241
1242 @Override
1243 protected Object filterOutboundMessage(Object msg) {
1244 if (msg instanceof ByteBuf) {
1245 ByteBuf buf = (ByteBuf) msg;
1246 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1247 }
1248 throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
1249 }
1250
1251 @Override
1252 protected void doRegister(ChannelPromise promise) {
1253 IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1254 eventLoop.register(ioUringUnsafe()).addListener(f -> {
1255 if (f.isSuccess()) {
1256 registration = (IoRegistration) f.getNow();
1257 promise.setSuccess();
1258 } else {
1259 promise.setFailure(f.cause());
1260 }
1261 });
1262 }
1263
1264 @Override
1265 protected final void doDeregister() {
1266
1267 if (!ioUringUnsafe().cancelOps(connectPromise != null)) {
1268
1269
1270 if (registration != null) {
1271 registration.cancel();
1272 }
1273 }
1274 }
1275
1276 @Override
1277 protected void doBind(final SocketAddress local) throws Exception {
1278 if (local instanceof InetSocketAddress) {
1279 checkResolvable((InetSocketAddress) local);
1280 }
1281 socket.bind(local);
1282 this.local = socket.localAddress();
1283 }
1284
1285 protected static void checkResolvable(InetSocketAddress addr) {
1286 if (addr.isUnresolved()) {
1287 throw new UnresolvedAddressException();
1288 }
1289 }
1290
1291 @Override
1292 protected final SocketAddress localAddress0() {
1293 return local;
1294 }
1295
1296 @Override
1297 protected final SocketAddress remoteAddress0() {
1298 return remote;
1299 }
1300
1301 private static boolean isAllowHalfClosure(ChannelConfig config) {
1302 return config instanceof SocketChannelConfig &&
1303 ((SocketChannelConfig) config).isAllowHalfClosure();
1304 }
1305
1306 private void cancelConnectTimeoutFuture() {
1307 if (connectTimeoutFuture != null) {
1308 connectTimeoutFuture.cancel(false);
1309 connectTimeoutFuture = null;
1310 }
1311 }
1312
1313 private void computeRemote() {
1314 if (requestedRemoteAddress instanceof InetSocketAddress) {
1315 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1316 }
1317 }
1318
1319 private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1320 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1321 }
1322
1323
1324
1325
1326
1327
1328
1329 protected abstract boolean socketIsEmpty(int flags);
1330
1331 abstract boolean isPollInFirst();
1332 }