1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufHolder;
21 import io.netty.buffer.ByteBufUtil;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractChannel;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelOption;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.ServerChannel;
38 import io.netty.channel.socket.ChannelInputShutdownEvent;
39 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40 import io.netty.channel.socket.SocketChannelConfig;
41 import io.netty.channel.unix.Buffer;
42 import io.netty.channel.unix.DomainSocketAddress;
43 import io.netty.channel.unix.Errors;
44 import io.netty.channel.unix.FileDescriptor;
45 import io.netty.channel.unix.UnixChannel;
46 import io.netty.channel.unix.UnixChannelUtil;
47 import io.netty.util.ReferenceCountUtil;
48 import io.netty.util.concurrent.PromiseNotifier;
49 import io.netty.util.internal.CleanableDirectBuffer;
50 import io.netty.util.internal.logging.InternalLogger;
51 import io.netty.util.internal.logging.InternalLoggerFactory;
52
53 import java.io.IOException;
54 import java.net.InetSocketAddress;
55 import java.net.SocketAddress;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.AlreadyConnectedException;
58 import java.nio.channels.ClosedChannelException;
59 import java.nio.channels.ConnectionPendingException;
60 import java.nio.channels.NotYetConnectedException;
61 import java.nio.channels.UnresolvedAddressException;
62 import java.util.concurrent.ScheduledFuture;
63 import java.util.concurrent.TimeUnit;
64
65 import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
66 import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
67 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
68 import static io.netty.util.internal.ObjectUtil.checkNotNull;
69
70
71 abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
72 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
73 final LinuxSocket socket;
74 protected volatile boolean active;
75
76
77 private static final int POLL_IN_SCHEDULED = 1;
78 private static final int POLL_OUT_SCHEDULED = 1 << 2;
79 private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
80 private static final int WRITE_SCHEDULED = 1 << 4;
81 private static final int READ_SCHEDULED = 1 << 5;
82 private static final int CONNECT_SCHEDULED = 1 << 6;
83
84 private short opsId = Short.MIN_VALUE;
85
86 private long pollInId;
87 private long pollOutId;
88 private long pollRdhupId;
89 private long connectId;
90
91
92 private byte ioState;
93
94
95
96
97 private short numOutstandingWrites;
98
99 private short numOutstandingReads;
100
101 private boolean readPending;
102 private boolean inReadComplete;
103 private boolean socketHasMoreData;
104
105 private static final class DelayedClose {
106 private final ChannelPromise promise;
107 private final Throwable cause;
108 private final ClosedChannelException closeCause;
109
110 DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
111 this.promise = promise;
112 this.cause = cause;
113 this.closeCause = closeCause;
114 }
115 }
116 private DelayedClose delayedClose;
117 private boolean inputClosedSeenErrorOnRead;
118
119
120
121
122 private ChannelPromise connectPromise;
123 private ScheduledFuture<?> connectTimeoutFuture;
124 private SocketAddress requestedRemoteAddress;
125 private CleanableDirectBuffer cleanable;
126 private ByteBuffer remoteAddressMemory;
127 private MsgHdrMemoryArray msgHdrMemoryArray;
128
129 private IoRegistration registration;
130
131 private volatile SocketAddress local;
132 private volatile SocketAddress remote;
133
134 AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
135 super(parent);
136 this.socket = checkNotNull(socket, "fd");
137
138 if (active) {
139
140
141 this.active = true;
142 this.local = socket.localAddress();
143 this.remote = socket.remoteAddress();
144 }
145
146 logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
147 }
148
149 AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
150 super(parent);
151 this.socket = checkNotNull(fd, "fd");
152 this.active = true;
153
154
155
156 this.remote = remote;
157 this.local = fd.localAddress();
158 }
159
160
161 final void autoReadCleared() {
162 if (!isRegistered()) {
163 return;
164 }
165 IoRegistration registration = this.registration;
166 if (registration == null || !registration.isValid()) {
167 return;
168 }
169 if (eventLoop().inEventLoop()) {
170 clearRead();
171 } else {
172 eventLoop().execute(this::clearRead);
173 }
174 }
175
176 private void clearRead() {
177 assert eventLoop().inEventLoop();
178 readPending = false;
179 IoRegistration registration = this.registration;
180 if (registration == null || !registration.isValid()) {
181 return;
182 }
183
184 cancelOutstandingReads(registration(), numOutstandingReads);
185 }
186
187
188
189
190
191
192 protected final short nextOpsId() {
193 short id = opsId++;
194
195
196 if (id == 0) {
197 id = opsId++;
198 }
199 return id;
200 }
201
202 public final boolean isOpen() {
203 return socket.isOpen();
204 }
205
206 @Override
207 public boolean isActive() {
208 return active;
209 }
210
211 @Override
212 public final FileDescriptor fd() {
213 return socket;
214 }
215
216 private AbstractUringUnsafe ioUringUnsafe() {
217 return (AbstractUringUnsafe) unsafe();
218 }
219
220 @Override
221 protected boolean isCompatible(final EventLoop loop) {
222 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
223 }
224
225 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
226 return newDirectBuffer(buf, buf);
227 }
228
229 protected boolean allowMultiShotPollIn() {
230 return IoUring.isPollAddMultishotEnabled();
231 }
232
233 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
234 final int readableBytes = buf.readableBytes();
235 if (readableBytes == 0) {
236 ReferenceCountUtil.release(holder);
237 return Unpooled.EMPTY_BUFFER;
238 }
239
240 final ByteBufAllocator alloc = alloc();
241 if (alloc.isDirectBufferPooled()) {
242 return newDirectBuffer0(holder, buf, alloc, readableBytes);
243 }
244
245 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
246 if (directBuf == null) {
247 return newDirectBuffer0(holder, buf, alloc, readableBytes);
248 }
249
250 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
251 ReferenceCountUtil.safeRelease(holder);
252 return directBuf;
253 }
254
255 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
256 final ByteBuf directBuf = alloc.directBuffer(capacity);
257 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
258 ReferenceCountUtil.safeRelease(holder);
259 return directBuf;
260 }
261
262
263
264
265
266
267
268 protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
269
270
271
272
273
274
275
276 protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
277
278 @Override
279 protected void doDisconnect() throws Exception {
280 }
281
282 private void freeRemoteAddressMemory() {
283 if (remoteAddressMemory != null) {
284 cleanable.clean();
285 cleanable = null;
286 remoteAddressMemory = null;
287 }
288 }
289
290 private void freeMsgHdrArray() {
291 if (msgHdrMemoryArray != null) {
292 msgHdrMemoryArray.release();
293 msgHdrMemoryArray = null;
294 }
295 }
296
297 @Override
298 protected void doClose() throws Exception {
299 active = false;
300
301 if (registration != null) {
302 if (socket.markClosed()) {
303 int fd = fd().intValue();
304 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
305 registration.submit(ops);
306 }
307 } else {
308
309 socket.close();
310 ioUringUnsafe().freeResourcesNowIfNeeded(null);
311 }
312 }
313
314 @Override
315 protected final void doBeginRead() {
316 if (inputClosedSeenErrorOnRead) {
317
318 return;
319 }
320 if (readPending) {
321
322 return;
323 }
324 readPending = true;
325 if (inReadComplete || !isActive()) {
326
327
328
329 return;
330 }
331 doBeginReadNow();
332 }
333
334 private void doBeginReadNow() {
335 if (inputClosedSeenErrorOnRead) {
336
337 return;
338 }
339 if (!isPollInFirst() ||
340
341
342 socketHasMoreData) {
343
344 ioUringUnsafe().scheduleFirstReadIfNeeded();
345 } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
346 ioUringUnsafe().schedulePollIn();
347 }
348 }
349
350 @Override
351 protected void doWrite(ChannelOutboundBuffer in) {
352 scheduleWriteIfNeeded(in, true);
353 }
354
355 protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
356 if ((ioState & WRITE_SCHEDULED) != 0) {
357 return;
358 }
359 if (scheduleWrite(in) > 0) {
360 ioState |= WRITE_SCHEDULED;
361 if (submitAndRunNow && !isWritable()) {
362 submitAndRunNow();
363 }
364 }
365 }
366
367 protected void submitAndRunNow() {
368
369 }
370
371 private int scheduleWrite(ChannelOutboundBuffer in) {
372 if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
373 return 0;
374 }
375 if (in == null) {
376 return 0;
377 }
378
379 int msgCount = in.size();
380 if (msgCount == 0) {
381 return 0;
382 }
383 Object msg = in.current();
384
385 if (msgCount > 1 && in.current() instanceof ByteBuf) {
386 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
387 } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
388 (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
389
390 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
391 } else {
392 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
393 }
394
395 assert numOutstandingWrites > 0;
396 return numOutstandingWrites;
397 }
398
399 protected final IoRegistration registration() {
400 assert registration != null;
401 return registration;
402 }
403
404 private void schedulePollOut() {
405 pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
406 }
407
408 final void schedulePollRdHup() {
409 pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
410 }
411
412 private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
413 assert (ioState & ioMask) == 0;
414 int fd = fd().intValue();
415 IoRegistration registration = registration();
416 IoUringIoOps ops = IoUringIoOps.newPollAdd(
417 fd, (byte) 0, mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
418 long id = registration.submit(ops);
419 if (id != 0) {
420 ioState |= (byte) ioMask;
421 }
422 return id;
423 }
424
425 final void resetCachedAddresses() {
426 local = socket.localAddress();
427 remote = socket.remoteAddress();
428 }
429
430 protected abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
431 private IoUringRecvByteAllocatorHandle allocHandle;
432 private boolean closed;
433 private boolean freed;
434 private boolean socketIsEmpty;
435
436
437
438
439
440 protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
441
442
443
444
445
446 protected abstract int scheduleWriteSingle(Object msg);
447
448 @Override
449 public final void handle(IoRegistration registration, IoEvent ioEvent) {
450 IoUringIoEvent event = (IoUringIoEvent) ioEvent;
451 byte op = event.opcode();
452 int res = event.res();
453 int flags = event.flags();
454 short data = event.data();
455 switch (op) {
456 case Native.IORING_OP_RECV:
457 case Native.IORING_OP_ACCEPT:
458 case Native.IORING_OP_RECVMSG:
459 case Native.IORING_OP_READ:
460 readComplete(op, res, flags, data);
461 break;
462 case Native.IORING_OP_WRITEV:
463 case Native.IORING_OP_SEND:
464 case Native.IORING_OP_SENDMSG:
465 case Native.IORING_OP_WRITE:
466 case Native.IORING_OP_SPLICE:
467 case Native.IORING_OP_SEND_ZC:
468 case Native.IORING_OP_SENDMSG_ZC:
469 writeComplete(op, res, flags, data);
470 break;
471 case Native.IORING_OP_POLL_ADD:
472 pollAddComplete(res, flags, data);
473 break;
474 case Native.IORING_OP_ASYNC_CANCEL:
475 cancelComplete0(op, res, flags, data);
476 break;
477 case Native.IORING_OP_CONNECT:
478 connectComplete(op, res, flags, data);
479
480
481 freeMsgHdrArray();
482 freeRemoteAddressMemory();
483 break;
484 case Native.IORING_OP_CLOSE:
485 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
486 if (delayedClose != null) {
487 delayedClose.promise.setSuccess();
488 }
489 closed = true;
490 }
491 break;
492 default:
493 break;
494 }
495
496
497
498 handleDelayedClosed();
499
500 if (ioState == 0 && closed) {
501 freeResourcesNowIfNeeded(registration);
502 }
503 }
504
505 private void freeResourcesNowIfNeeded(IoRegistration reg) {
506 if (!freed) {
507 freed = true;
508 freeResourcesNow(reg);
509 }
510 }
511
512
513
514
515
516
517 protected void freeResourcesNow(IoRegistration reg) {
518 freeMsgHdrArray();
519 freeRemoteAddressMemory();
520 if (reg != null) {
521 reg.cancel();
522 }
523 }
524
525 private void handleDelayedClosed() {
526 if (delayedClose != null && canCloseNow()) {
527 closeNow();
528 }
529 }
530
531 private void pollAddComplete(int res, int flags, short data) {
532 if ((res & Native.POLLOUT) != 0) {
533 pollOut(res);
534 }
535 if ((res & Native.POLLIN) != 0) {
536 pollIn(res, flags, data);
537 }
538 if ((res & Native.POLLRDHUP) != 0) {
539 pollRdHup(res);
540 }
541 }
542
543 @Override
544 public final void close() throws Exception {
545 close(voidPromise());
546 }
547
548 @Override
549 protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
550 if (closeFuture().isDone()) {
551
552 safeSetSuccess(promise);
553 return;
554 }
555 if (delayedClose == null) {
556
557
558
559 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
560 } else {
561 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
562 return;
563 }
564
565 boolean cancelConnect = false;
566 try {
567 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
568 if (connectPromise != null) {
569
570 connectPromise.tryFailure(new ClosedChannelException());
571 AbstractIoUringChannel.this.connectPromise = null;
572 cancelConnect = true;
573 }
574
575 cancelConnectTimeoutFuture();
576 } finally {
577
578
579 cancelOps(cancelConnect);
580 }
581
582 if (canCloseNow()) {
583
584 closeNow();
585 }
586 }
587
588 private void cancelOps(boolean cancelConnect) {
589 if (registration == null || !registration.isValid()) {
590 return;
591 }
592 byte flags = (byte) 0;
593 if ((ioState & POLL_RDHUP_SCHEDULED) != 0 && pollRdhupId != 0) {
594 long id = registration.submit(
595 IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
596 assert id != 0;
597 pollRdhupId = 0;
598 }
599 if ((ioState & POLL_IN_SCHEDULED) != 0 && pollInId != 0) {
600 long id = registration.submit(
601 IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
602 assert id != 0;
603 pollInId = 0;
604 }
605 if ((ioState & POLL_OUT_SCHEDULED) != 0 && pollOutId != 0) {
606 long id = registration.submit(
607 IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
608 assert id != 0;
609 pollOutId = 0;
610 }
611 if (cancelConnect && connectId != 0) {
612
613 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
614 assert id != 0;
615 connectId = 0;
616 }
617 cancelOutstandingReads(registration, numOutstandingReads);
618 cancelOutstandingWrites(registration, numOutstandingWrites);
619 }
620
621 private boolean canCloseNow() {
622
623
624 return canCloseNow0() && (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
625 }
626
627 protected boolean canCloseNow0() {
628 return true;
629 }
630
631 private void closeNow() {
632 super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
633 }
634
635 @Override
636 protected final void flush0() {
637
638
639
640 if ((ioState & POLL_OUT_SCHEDULED) == 0) {
641 super.flush0();
642 }
643 }
644
645 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
646 if (promise == null) {
647
648 return;
649 }
650
651
652 promise.tryFailure(cause);
653 closeIfClosed();
654 }
655
656 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
657 if (promise == null) {
658
659 return;
660 }
661 active = true;
662
663 if (local == null) {
664 local = socket.localAddress();
665 }
666 computeRemote();
667
668
669 schedulePollRdHup();
670
671
672
673 boolean active = isActive();
674
675
676 boolean promiseSet = promise.trySuccess();
677
678
679
680 if (!wasActive && active) {
681 pipeline().fireChannelActive();
682 }
683
684
685 if (!promiseSet) {
686 close(voidPromise());
687 }
688 }
689
690 @Override
691 public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
692 if (allocHandle == null) {
693 allocHandle = new IoUringRecvByteAllocatorHandle(
694 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
695 }
696 return allocHandle;
697 }
698
699 final void shutdownInput(boolean allDataRead) {
700 logger.trace("shutdownInput Fd: {}", fd().intValue());
701 if (!socket.isInputShutdown()) {
702 if (isAllowHalfClosure(config())) {
703 try {
704 socket.shutdown(true, false);
705 } catch (IOException ignored) {
706
707
708 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
709 return;
710 } catch (NotYetConnectedException ignore) {
711
712
713 }
714 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
715 } else {
716
717 inputClosedSeenErrorOnRead = true;
718 close(voidPromise());
719 return;
720 }
721 }
722 if (allDataRead && !inputClosedSeenErrorOnRead) {
723 inputClosedSeenErrorOnRead = true;
724 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
725 }
726 }
727
728 private void fireEventAndClose(Object evt) {
729 pipeline().fireUserEventTriggered(evt);
730 close(voidPromise());
731 }
732
733 final void schedulePollIn() {
734 assert (ioState & POLL_IN_SCHEDULED) == 0;
735 if (!isActive() || shouldBreakIoUringInReady(config())) {
736 return;
737 }
738 pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, allowMultiShotPollIn());
739 }
740
741 private void readComplete(byte op, int res, int flags, short data) {
742 assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
743
744 boolean multishot = numOutstandingReads == -1;
745 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
746 if (rearm) {
747
748
749 ioState &= ~READ_SCHEDULED;
750 }
751 boolean pending = readPending;
752 if (multishot) {
753
754
755 readPending = false;
756 } else if (--numOutstandingReads == 0) {
757
758 readPending = false;
759 ioState &= ~READ_SCHEDULED;
760 }
761 inReadComplete = true;
762 try {
763 socketIsEmpty = socketIsEmpty(flags);
764 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
765 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
766 readComplete0(op, res, flags, data, numOutstandingReads);
767 } finally {
768 try {
769
770 if (recvBufAllocHandle().isReadComplete()) {
771
772 recvBufAllocHandle().reset(config());
773
774
775 if (!multishot) {
776 if (readPending) {
777
778
779 doBeginReadNow();
780 }
781 } else {
782
783
784
785 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
786
787
788
789
790 if (pending) {
791 doBeginReadNow();
792 }
793 } else if (rearm) {
794
795 doBeginReadNow();
796 } else if (!readPending) {
797
798
799 cancelOutstandingReads(registration, numOutstandingReads);
800 }
801 }
802 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
803
804
805
806
807 if (pending) {
808 doBeginReadNow();
809 }
810 } else if (multishot && rearm) {
811
812 doBeginReadNow();
813 }
814 } finally {
815 inReadComplete = false;
816 socketIsEmpty = false;
817 }
818 }
819 }
820
821
822
823
824 protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
825
826
827
828
829 private void pollRdHup(int res) {
830 ioState &= ~POLL_RDHUP_SCHEDULED;
831 pollRdhupId = 0;
832 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
833 return;
834 }
835
836
837 recvBufAllocHandle().rdHupReceived();
838
839 if (isActive()) {
840 scheduleFirstReadIfNeeded();
841 } else {
842
843 shutdownInput(false);
844 }
845 }
846
847
848
849
850 private void pollIn(int res, int flags, short data) {
851
852 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
853 if (rearm) {
854 ioState &= ~POLL_IN_SCHEDULED;
855 pollInId = 0;
856 }
857 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
858 return;
859 }
860 if (!readPending) {
861
862
863 socketHasMoreData = true;
864 return;
865 }
866 scheduleFirstReadIfNeeded();
867 }
868
869 private void scheduleFirstReadIfNeeded() {
870 if ((ioState & READ_SCHEDULED) == 0) {
871 scheduleFirstRead();
872 }
873 }
874
875 private void scheduleFirstRead() {
876
877 final ChannelConfig config = config();
878 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
879 allocHandle.reset(config);
880 scheduleRead(true);
881 }
882
883 protected final void scheduleRead(boolean first) {
884
885 if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
886 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
887 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
888 ioState |= READ_SCHEDULED;
889 }
890 }
891 }
892
893
894
895
896
897
898
899
900
901
902
903 protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
904
905
906
907
908
909
910 private void pollOut(int res) {
911 ioState &= ~POLL_OUT_SCHEDULED;
912 pollOutId = 0;
913 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
914 return;
915 }
916
917 if (connectPromise != null) {
918
919
920
921 assert eventLoop().inEventLoop();
922
923 boolean connectStillInProgress = false;
924 try {
925 boolean wasActive = isActive();
926 if (!socket.finishConnect()) {
927 connectStillInProgress = true;
928 return;
929 }
930 fulfillConnectPromise(connectPromise, wasActive);
931 } catch (Throwable t) {
932 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
933 } finally {
934 if (!connectStillInProgress) {
935
936
937
938 cancelConnectTimeoutFuture();
939 connectPromise = null;
940 } else {
941
942 schedulePollOut();
943 }
944 }
945 } else if (!socket.isOutputShutdown()) {
946
947 super.flush0();
948 }
949 }
950
951
952
953
954
955
956
957
958
959 private void writeComplete(byte op, int res, int flags, short data) {
960 if ((ioState & CONNECT_SCHEDULED) != 0) {
961
962
963 freeMsgHdrArray();
964 if (res > 0) {
965
966 outboundBuffer().removeBytes(res);
967
968
969 connectComplete(op, 0, flags, data);
970 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
971
972
973
974
975 submitConnect((InetSocketAddress) requestedRemoteAddress);
976 } else {
977
978 connectComplete(op, res, flags, data);
979 }
980 return;
981 }
982
983 if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
984 assert numOutstandingWrites > 0;
985 --numOutstandingWrites;
986 }
987
988 boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
989 if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
990
991
992 schedulePollOut();
993 }
994
995
996
997 if (numOutstandingWrites == 0) {
998 ioState &= ~WRITE_SCHEDULED;
999
1000
1001 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
1002 scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
1003 }
1004 }
1005 }
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015 abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025 void cancelComplete0(byte op, int res, int flags, short data) {
1026
1027 }
1028
1029
1030
1031
1032
1033
1034
1035
1036 void connectComplete(byte op, int res, int flags, short data) {
1037 ioState &= ~CONNECT_SCHEDULED;
1038 freeRemoteAddressMemory();
1039
1040 if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1041
1042 schedulePollOut();
1043 } else {
1044 try {
1045 if (res == 0) {
1046 fulfillConnectPromise(connectPromise, active);
1047 if (readPending) {
1048 doBeginReadNow();
1049 }
1050 } else {
1051 try {
1052 Errors.throwConnectException("io_uring connect", res);
1053 } catch (Throwable cause) {
1054 fulfillConnectPromise(connectPromise, cause);
1055 }
1056 }
1057 } finally {
1058
1059
1060
1061 cancelConnectTimeoutFuture();
1062 connectPromise = null;
1063 }
1064 }
1065 }
1066
1067 @Override
1068 public void connect(
1069 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1070
1071
1072 if (promise.isDone() || !ensureOpen(promise)) {
1073 return;
1074 }
1075
1076 if (delayedClose != null) {
1077 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1078 return;
1079 }
1080 try {
1081 if (connectPromise != null) {
1082 throw new ConnectionPendingException();
1083 }
1084 if (localAddress instanceof InetSocketAddress) {
1085 checkResolvable((InetSocketAddress) localAddress);
1086 }
1087
1088 if (remoteAddress instanceof InetSocketAddress) {
1089 checkResolvable((InetSocketAddress) remoteAddress);
1090 }
1091
1092 if (remote != null) {
1093
1094
1095
1096 throw new AlreadyConnectedException();
1097 }
1098
1099 if (localAddress != null) {
1100 socket.bind(localAddress);
1101 }
1102
1103 if (remoteAddress instanceof InetSocketAddress) {
1104 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1105 ByteBuf initialData = null;
1106 if (IoUring.isTcpFastOpenClientSideAvailable() &&
1107 config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1108 ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1109 outbound.addFlush();
1110 Object curr;
1111 if ((curr = outbound.current()) instanceof ByteBuf) {
1112 initialData = (ByteBuf) curr;
1113 }
1114 }
1115 if (initialData != null) {
1116 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1117 MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1118 hdr.set(socket, inetSocketAddress, IoUring.memoryAddress(initialData),
1119 initialData.readableBytes(), (short) 0);
1120
1121 int fd = fd().intValue();
1122 IoRegistration registration = registration();
1123 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
1124 hdr.address(), hdr.idx());
1125 connectId = registration.submit(ops);
1126 if (connectId == 0) {
1127
1128 freeMsgHdrArray();
1129 }
1130 } else {
1131 submitConnect(inetSocketAddress);
1132 }
1133 } else if (remoteAddress instanceof DomainSocketAddress) {
1134 DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) remoteAddress;
1135 submitConnect(unixDomainSocketAddress);
1136 } else {
1137 throw new Error("Unexpected SocketAddress implementation " + remoteAddress);
1138 }
1139
1140 if (connectId != 0) {
1141 ioState |= CONNECT_SCHEDULED;
1142 }
1143 } catch (Throwable t) {
1144 closeIfClosed();
1145 promise.tryFailure(annotateConnectException(t, remoteAddress));
1146 return;
1147 }
1148 connectPromise = promise;
1149 requestedRemoteAddress = remoteAddress;
1150
1151 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1152 if (connectTimeoutMillis > 0) {
1153 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1154 @Override
1155 public void run() {
1156 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1157 if (connectPromise != null && !connectPromise.isDone() &&
1158 connectPromise.tryFailure(new ConnectTimeoutException(
1159 "connection timed out: " + remoteAddress))) {
1160 close(voidPromise());
1161 }
1162 }
1163 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1164 }
1165
1166 promise.addListener(new ChannelFutureListener() {
1167 @Override
1168 public void operationComplete(ChannelFuture future) {
1169
1170
1171 if (future.isCancelled()) {
1172 cancelConnectTimeoutFuture();
1173 connectPromise = null;
1174 close(voidPromise());
1175 }
1176 }
1177 });
1178 }
1179 }
1180
1181 private void submitConnect(InetSocketAddress inetSocketAddress) {
1182 cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1183 remoteAddressMemory = cleanable.buffer();
1184
1185 SockaddrIn.set(socket.isIpv6(), remoteAddressMemory, inetSocketAddress);
1186
1187 int fd = fd().intValue();
1188 IoRegistration registration = registration();
1189 IoUringIoOps ops = IoUringIoOps.newConnect(
1190 fd, (byte) 0, Buffer.memoryAddress(remoteAddressMemory), nextOpsId());
1191 connectId = registration.submit(ops);
1192 if (connectId == 0) {
1193
1194 freeRemoteAddressMemory();
1195 }
1196 }
1197
1198 private void submitConnect(DomainSocketAddress unixDomainSocketAddress) {
1199 cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_UN);
1200 remoteAddressMemory = cleanable.buffer();
1201 SockaddrIn.setUds(remoteAddressMemory, unixDomainSocketAddress);
1202 int fd = fd().intValue();
1203 IoRegistration registration = registration();
1204 long addr = Buffer.memoryAddress(remoteAddressMemory);
1205 IoUringIoOps ops = IoUringIoOps.newConnect(fd, (byte) 0, addr, Native.SIZEOF_SOCKADDR_UN, nextOpsId());
1206 connectId = registration.submit(ops);
1207 if (connectId == 0) {
1208
1209 freeRemoteAddressMemory();
1210 }
1211 }
1212
1213 @Override
1214 protected Object filterOutboundMessage(Object msg) {
1215 if (msg instanceof ByteBuf) {
1216 ByteBuf buf = (ByteBuf) msg;
1217 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1218 }
1219 throw new UnsupportedOperationException("unsupported message type");
1220 }
1221
1222 @Override
1223 protected void doRegister(ChannelPromise promise) {
1224 IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1225 eventLoop.register(ioUringUnsafe()).addListener(f -> {
1226 if (f.isSuccess()) {
1227 registration = (IoRegistration) f.getNow();
1228 promise.setSuccess();
1229 } else {
1230 promise.setFailure(f.cause());
1231 }
1232 });
1233 }
1234
1235 @Override
1236 protected final void doDeregister() {
1237
1238 ioUringUnsafe().cancelOps(connectPromise != null);
1239 }
1240
1241 @Override
1242 protected void doBind(final SocketAddress local) throws Exception {
1243 if (local instanceof InetSocketAddress) {
1244 checkResolvable((InetSocketAddress) local);
1245 }
1246 socket.bind(local);
1247 this.local = socket.localAddress();
1248 }
1249
1250 protected static void checkResolvable(InetSocketAddress addr) {
1251 if (addr.isUnresolved()) {
1252 throw new UnresolvedAddressException();
1253 }
1254 }
1255
1256 @Override
1257 protected final SocketAddress localAddress0() {
1258 return local;
1259 }
1260
1261 @Override
1262 protected final SocketAddress remoteAddress0() {
1263 return remote;
1264 }
1265
1266 private static boolean isAllowHalfClosure(ChannelConfig config) {
1267 return config instanceof SocketChannelConfig &&
1268 ((SocketChannelConfig) config).isAllowHalfClosure();
1269 }
1270
1271 private void cancelConnectTimeoutFuture() {
1272 if (connectTimeoutFuture != null) {
1273 connectTimeoutFuture.cancel(false);
1274 connectTimeoutFuture = null;
1275 }
1276 }
1277
1278 private void computeRemote() {
1279 if (requestedRemoteAddress instanceof InetSocketAddress) {
1280 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1281 }
1282 }
1283
1284 private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1285 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1286 }
1287
1288
1289
1290
1291
1292
1293
1294 protected abstract boolean socketIsEmpty(int flags);
1295
1296 abstract boolean isPollInFirst();
1297 }