1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufHolder;
21 import io.netty.buffer.ByteBufUtil;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractChannel;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelOption;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.ServerChannel;
38 import io.netty.channel.socket.ChannelInputShutdownEvent;
39 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40 import io.netty.channel.socket.SocketChannelConfig;
41 import io.netty.channel.unix.Buffer;
42 import io.netty.channel.unix.Errors;
43 import io.netty.channel.unix.FileDescriptor;
44 import io.netty.channel.unix.UnixChannel;
45 import io.netty.channel.unix.UnixChannelUtil;
46 import io.netty.util.ReferenceCountUtil;
47 import io.netty.util.concurrent.PromiseNotifier;
48 import io.netty.util.internal.logging.InternalLogger;
49 import io.netty.util.internal.logging.InternalLoggerFactory;
50
51 import java.io.IOException;
52 import java.net.InetSocketAddress;
53 import java.net.SocketAddress;
54 import java.nio.ByteBuffer;
55 import java.nio.channels.AlreadyConnectedException;
56 import java.nio.channels.ClosedChannelException;
57 import java.nio.channels.ConnectionPendingException;
58 import java.nio.channels.NotYetConnectedException;
59 import java.nio.channels.UnresolvedAddressException;
60 import java.util.concurrent.ScheduledFuture;
61 import java.util.concurrent.TimeUnit;
62
63 import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
64 import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
65 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
66 import static io.netty.util.internal.ObjectUtil.checkNotNull;
67
68
69 abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
70 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
71 final LinuxSocket socket;
72 protected volatile boolean active;
73
74
75 private static final int POLL_IN_SCHEDULED = 1;
76 private static final int POLL_OUT_SCHEDULED = 1 << 2;
77 private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
78 private static final int WRITE_SCHEDULED = 1 << 4;
79 private static final int READ_SCHEDULED = 1 << 5;
80 private static final int CONNECT_SCHEDULED = 1 << 6;
81
82 private short opsId = Short.MIN_VALUE;
83
84 private long pollInId;
85 private long pollOutId;
86 private long pollRdhupId;
87 private long connectId;
88
89
90 private byte ioState;
91
92
93
94
95 private short numOutstandingWrites;
96 private short numOutstandingReads;
97
98 private boolean readPending;
99 private boolean inReadComplete;
100
101 private ChannelPromise delayedClose;
102 private boolean inputClosedSeenErrorOnRead;
103
104
105
106
107 private ChannelPromise connectPromise;
108 private ScheduledFuture<?> connectTimeoutFuture;
109 private SocketAddress requestedRemoteAddress;
110 private ByteBuffer remoteAddressMemory;
111 private MsgHdrMemoryArray msgHdrMemoryArray;
112
113 private IoUringIoRegistration registration;
114
115 private volatile SocketAddress local;
116 private volatile SocketAddress remote;
117
118 AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
119 super(parent);
120 this.socket = checkNotNull(socket, "fd");
121
122 if (active) {
123
124
125 this.active = true;
126 this.local = socket.localAddress();
127 this.remote = socket.remoteAddress();
128 }
129
130 logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
131 }
132
133 AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
134 super(parent);
135 this.socket = checkNotNull(fd, "fd");
136 this.active = true;
137
138
139
140 this.remote = remote;
141 this.local = fd.localAddress();
142 }
143
144
145
146
147
148
149 protected final short nextOpsId() {
150 short id = opsId++;
151
152
153 if (id == 0) {
154 id = opsId++;
155 }
156 return id;
157 }
158
159 public final boolean isOpen() {
160 return socket.isOpen();
161 }
162
163 @Override
164 public boolean isActive() {
165 return active;
166 }
167
168 @Override
169 public final FileDescriptor fd() {
170 return socket;
171 }
172
173 private AbstractUringUnsafe ioUringUnsafe() {
174 return (AbstractUringUnsafe) unsafe();
175 }
176
177 @Override
178 protected boolean isCompatible(final EventLoop loop) {
179 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
180 }
181
182 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
183 return newDirectBuffer(buf, buf);
184 }
185
186 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
187 final int readableBytes = buf.readableBytes();
188 if (readableBytes == 0) {
189 ReferenceCountUtil.release(holder);
190 return Unpooled.EMPTY_BUFFER;
191 }
192
193 final ByteBufAllocator alloc = alloc();
194 if (alloc.isDirectBufferPooled()) {
195 return newDirectBuffer0(holder, buf, alloc, readableBytes);
196 }
197
198 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
199 if (directBuf == null) {
200 return newDirectBuffer0(holder, buf, alloc, readableBytes);
201 }
202
203 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
204 ReferenceCountUtil.safeRelease(holder);
205 return directBuf;
206 }
207
208 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
209 final ByteBuf directBuf = alloc.directBuffer(capacity);
210 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
211 ReferenceCountUtil.safeRelease(holder);
212 return directBuf;
213 }
214
215
216
217
218
219
220
221 protected abstract void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads);
222
223
224
225
226
227
228
229 protected abstract void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites);
230
231 @Override
232 protected void doDisconnect() throws Exception {
233 }
234
235 private void freeRemoteAddressMemory() {
236 if (remoteAddressMemory != null) {
237 Buffer.free(remoteAddressMemory);
238 remoteAddressMemory = null;
239 }
240 }
241
242 private void freeMsgHdrArray() {
243 if (msgHdrMemoryArray != null) {
244 msgHdrMemoryArray.release();
245 msgHdrMemoryArray = null;
246 }
247 }
248
249 @Override
250 protected void doClose() throws Exception {
251 active = false;
252
253 if (registration != null) {
254 if (socket.markClosed()) {
255 int fd = fd().intValue();
256 IoUringIoOps ops = IoUringIoOps.newClose(fd, 0, nextOpsId());
257 registration.submit(ops);
258 }
259 } else {
260
261 socket.close();
262 freeRemoteAddressMemory();
263 freeMsgHdrArray();
264 }
265 }
266
267 @Override
268 protected final void doBeginRead() {
269 if (inputClosedSeenErrorOnRead) {
270
271 return;
272 }
273 if (readPending) {
274
275 return;
276 }
277 if (inReadComplete || !isActive()) {
278
279
280
281 readPending = true;
282 return;
283 }
284 doBeginReadNow();
285 }
286
287 private void doBeginReadNow() {
288 if (socket.isBlocking()) {
289
290 ioUringUnsafe().scheduleFirstReadIfNeeded();
291 } else {
292 if ((ioState & POLL_IN_SCHEDULED) == 0) {
293 ioUringUnsafe().schedulePollIn();
294 }
295 }
296 }
297
298 @Override
299 protected void doWrite(ChannelOutboundBuffer in) {
300 if ((ioState & WRITE_SCHEDULED) != 0) {
301 return;
302 }
303 if (scheduleWrite(in) > 0) {
304 ioState |= WRITE_SCHEDULED;
305 }
306 }
307
308 private int scheduleWrite(ChannelOutboundBuffer in) {
309 if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
310 return 0;
311 }
312 if (in == null) {
313 return 0;
314 }
315
316 int msgCount = in.size();
317 if (msgCount == 0) {
318 return 0;
319 }
320 Object msg = in.current();
321
322 if (msgCount > 1) {
323 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
324 } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
325 (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
326
327 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
328 } else {
329 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
330 }
331
332 assert numOutstandingWrites > 0;
333 return numOutstandingWrites;
334 }
335
336 protected final IoUringIoRegistration registration() {
337 assert registration != null;
338 return registration;
339 }
340
341 private void schedulePollOut() {
342
343 assert !socket.isBlocking();
344
345 assert (ioState & POLL_OUT_SCHEDULED) == 0;
346 int fd = fd().intValue();
347 IoUringIoRegistration registration = registration();
348 IoUringIoOps ops = IoUringIoOps.newPollAdd(
349 fd, 0, Native.POLLOUT, (short) Native.POLLOUT);
350 pollOutId = registration.submit(ops);
351 ioState |= POLL_OUT_SCHEDULED;
352 }
353
354 final void schedulePollRdHup() {
355 assert (ioState & POLL_RDHUP_SCHEDULED) == 0;
356 int fd = fd().intValue();
357 IoUringIoRegistration registration = registration();
358 IoUringIoOps ops = IoUringIoOps.newPollAdd(
359 fd, 0, Native.POLLRDHUP, (short) Native.POLLRDHUP);
360 pollRdhupId = registration.submit(ops);
361 ioState |= POLL_RDHUP_SCHEDULED;
362 }
363
364 final void resetCachedAddresses() {
365 local = socket.localAddress();
366 remote = socket.remoteAddress();
367 }
368
369 abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
370 private IoUringRecvByteAllocatorHandle allocHandle;
371
372
373
374
375
376 protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
377
378
379
380
381
382 protected abstract int scheduleWriteSingle(Object msg);
383
384 private boolean closed;
385 private boolean freed;
386
387 @Override
388 public final void handle(IoRegistration registration, IoEvent ioEvent) {
389 IoUringIoRegistration reg = (IoUringIoRegistration) registration;
390 IoUringIoEvent event = (IoUringIoEvent) ioEvent;
391 byte op = event.opcode();
392 int res = event.res();
393 int flags = event.flags();
394 short data = event.data();
395 switch (op) {
396 case Native.IORING_OP_RECV:
397 case Native.IORING_OP_ACCEPT:
398 case Native.IORING_OP_RECVMSG:
399 case Native.IORING_OP_READ:
400 readComplete(res, flags, data);
401
402
403
404 handleDelayedClosed();
405 break;
406 case Native.IORING_OP_WRITEV:
407 case Native.IORING_OP_SEND:
408 case Native.IORING_OP_SENDMSG:
409 case Native.IORING_OP_WRITE:
410 writeComplete(res, flags, data);
411
412
413
414 handleDelayedClosed();
415 break;
416 case Native.IORING_OP_POLL_ADD:
417 pollAddComplete(res, flags, data);
418 break;
419 case Native.IORING_OP_POLL_REMOVE:
420
421
422 if (res == 0) {
423 clearPollFlag(data);
424 }
425 break;
426 case Native.IORING_OP_ASYNC_CANCEL:
427 cancelComplete0(res, flags, data);
428 break;
429 case Native.IORING_OP_CONNECT:
430 connectComplete(res, flags, data);
431
432
433 freeMsgHdrArray();
434 freeRemoteAddressMemory();
435 break;
436 case Native.IORING_OP_CLOSE:
437 if (delayedClose != null) {
438 delayedClose.setSuccess();
439 }
440 closed = true;
441 break;
442 }
443
444 if (ioState == 0 && closed && !freed) {
445 freed = true;
446 freeResourcesNow(reg);
447 }
448 }
449
450
451
452
453
454
455 protected void freeResourcesNow(IoUringIoRegistration reg) {
456 freeMsgHdrArray();
457 freeRemoteAddressMemory();
458 reg.cancel();
459 }
460
461 private void handleDelayedClosed() {
462 if (delayedClose != null && canCloseNow()) {
463 closeNow(newPromise());
464 }
465 }
466
467 private void pollAddComplete(int res, int flags, short data) {
468 if ((data & Native.POLLOUT) != 0) {
469 pollOut(res);
470 }
471 if ((data & Native.POLLIN) != 0) {
472 pollIn(res);
473 }
474 if ((data & Native.POLLRDHUP) != 0) {
475 pollRdHup(res);
476 }
477 }
478
479 @Override
480 public final void close() throws Exception {
481 close(voidPromise());
482 }
483
484 @Override
485 public final void close(ChannelPromise promise) {
486 if (delayedClose == null) {
487
488
489
490 delayedClose = promise.isVoid() ? newPromise() : promise;
491 } else {
492 delayedClose.addListener(new PromiseNotifier<>(false, promise));
493 return;
494 }
495
496 boolean cancelConnect = false;
497 try {
498 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
499 if (connectPromise != null) {
500
501 connectPromise.tryFailure(new ClosedChannelException());
502 AbstractIoUringChannel.this.connectPromise = null;
503 cancelConnect = true;
504 }
505
506 cancelConnectTimeoutFuture();
507 } finally {
508
509
510 if (registration != null) {
511 if (cancelConnect && connectId != 0) {
512 int fd = fd().intValue();
513
514 registration.submit(IoUringIoOps.newAsyncCancel(
515 fd, 0, connectId, Native.IORING_OP_CONNECT));
516 }
517 cancelOutstandingReads(registration, numOutstandingReads);
518 cancelOutstandingWrites(registration, numOutstandingWrites);
519 }
520 }
521
522 if (canCloseNow()) {
523
524 closeNow(newPromise());
525 }
526 }
527
528 private boolean canCloseNow() {
529
530
531 return (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
532 }
533
534 private void closeNow(ChannelPromise promise) {
535 super.close(promise);
536 }
537
538 @Override
539 protected final void flush0() {
540
541
542
543 if ((ioState & POLL_OUT_SCHEDULED) == 0) {
544 super.flush0();
545 }
546 }
547
548 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
549 if (promise == null) {
550
551 return;
552 }
553
554
555 promise.tryFailure(cause);
556 closeIfClosed();
557 }
558
559 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
560 if (promise == null) {
561
562 return;
563 }
564 active = true;
565
566 if (local == null) {
567 local = socket.localAddress();
568 }
569 computeRemote();
570
571
572 schedulePollRdHup();
573
574
575
576 boolean active = isActive();
577
578
579 boolean promiseSet = promise.trySuccess();
580
581
582
583 if (!wasActive && active) {
584 pipeline().fireChannelActive();
585 }
586
587
588 if (!promiseSet) {
589 close(voidPromise());
590 }
591 }
592
593 @Override
594 public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
595 if (allocHandle == null) {
596 allocHandle = new IoUringRecvByteAllocatorHandle(
597 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
598 }
599 return allocHandle;
600 }
601
602 final void shutdownInput(boolean allDataRead) {
603 logger.trace("shutdownInput Fd: {}", fd().intValue());
604 if (!socket.isInputShutdown()) {
605 if (isAllowHalfClosure(config())) {
606 try {
607 socket.shutdown(true, false);
608 } catch (IOException ignored) {
609
610
611 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
612 return;
613 } catch (NotYetConnectedException ignore) {
614
615
616 }
617 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
618 } else {
619 close(voidPromise());
620 return;
621 }
622 }
623 if (allDataRead && !inputClosedSeenErrorOnRead) {
624 inputClosedSeenErrorOnRead = true;
625 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
626 }
627 }
628
629 private void fireEventAndClose(Object evt) {
630 pipeline().fireUserEventTriggered(evt);
631 close(voidPromise());
632 }
633
634 final void schedulePollIn() {
635
636 assert !socket.isBlocking();
637
638 assert (ioState & POLL_IN_SCHEDULED) == 0;
639 if (!isActive() || shouldBreakIoUringInReady(config())) {
640 return;
641 }
642 int fd = fd().intValue();
643 IoUringIoRegistration registration = registration();
644 IoUringIoOps ops = IoUringIoOps.newPollAdd(
645 fd, 0, Native.POLLIN, (short) Native.POLLIN);
646 pollInId = registration.submit(ops);
647 ioState |= POLL_IN_SCHEDULED;
648 }
649
650 private void readComplete(int res, int flags, int data) {
651 assert numOutstandingReads > 0;
652 if (--numOutstandingReads == 0) {
653 readPending = false;
654 ioState &= ~READ_SCHEDULED;
655 }
656 inReadComplete = true;
657 try {
658 readComplete0(res, flags, data, numOutstandingReads);
659 } finally {
660 inReadComplete = false;
661
662
663 if (readPending && recvBufAllocHandle().isReadComplete()) {
664 doBeginReadNow();
665 }
666 }
667 }
668
669
670
671
672 protected abstract void readComplete0(int res, int flags, int data, int outstandingCompletes);
673
674
675
676
677 private void pollRdHup(int res) {
678 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
679 return;
680 }
681 ioState &= ~POLL_RDHUP_SCHEDULED;
682
683
684 recvBufAllocHandle().rdHupReceived();
685
686 if (isActive()) {
687 scheduleFirstReadIfNeeded();
688 } else {
689
690 shutdownInput(false);
691 }
692 }
693
694
695
696
697 private void pollIn(int res) {
698 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
699 return;
700 }
701 ioState &= ~POLL_IN_SCHEDULED;
702
703 scheduleFirstReadIfNeeded();
704 }
705
706 private void scheduleFirstReadIfNeeded() {
707 if ((ioState & READ_SCHEDULED) == 0) {
708 scheduleFirstRead();
709 }
710 }
711
712 private void scheduleFirstRead() {
713
714 final ChannelConfig config = config();
715 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
716 allocHandle.reset(config);
717 scheduleRead(true);
718 }
719
720 protected final void scheduleRead(boolean first) {
721
722 if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
723 numOutstandingReads = (short) scheduleRead0(first);
724 if (numOutstandingReads > 0) {
725 ioState |= READ_SCHEDULED;
726 }
727 }
728 }
729
730
731
732
733
734
735
736 protected abstract int scheduleRead0(boolean first);
737
738
739
740
741
742
743 private void pollOut(int res) {
744 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
745 return;
746 }
747 ioState &= ~POLL_OUT_SCHEDULED;
748
749 if (connectPromise != null) {
750
751
752
753 assert eventLoop().inEventLoop();
754
755 boolean connectStillInProgress = false;
756 try {
757 boolean wasActive = isActive();
758 if (!socket.finishConnect()) {
759 connectStillInProgress = true;
760 return;
761 }
762 fulfillConnectPromise(connectPromise, wasActive);
763 } catch (Throwable t) {
764 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
765 } finally {
766 if (!connectStillInProgress) {
767
768
769
770 cancelConnectTimeoutFuture();
771 connectPromise = null;
772 } else {
773
774 assert !socket.isBlocking();
775
776
777 schedulePollOut();
778 }
779 }
780 } else if (!socket.isOutputShutdown()) {
781
782 super.flush0();
783 }
784 }
785
786
787
788
789
790
791
792
793 private void writeComplete(int res, int flags, short data) {
794 if ((ioState & CONNECT_SCHEDULED) != 0) {
795
796
797 freeMsgHdrArray();
798 if (res > 0) {
799
800 outboundBuffer().removeBytes(res);
801
802
803 connectComplete(0, flags, data);
804 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
805
806
807
808
809 submitConnect((InetSocketAddress) requestedRemoteAddress);
810 } else {
811
812 connectComplete(res, flags, data);
813 }
814 return;
815 }
816 assert numOutstandingWrites > 0;
817 --numOutstandingWrites;
818
819 boolean writtenAll = writeComplete0(res, flags, data, numOutstandingWrites);
820 if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
821
822 assert !socket.isBlocking();
823
824
825 schedulePollOut();
826 }
827
828
829
830 if (numOutstandingWrites == 0) {
831 ioState &= ~WRITE_SCHEDULED;
832
833
834 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
835 doWrite(unsafe().outboundBuffer());
836 }
837 }
838 }
839
840
841
842
843
844
845
846
847
848 abstract boolean writeComplete0(int res, int flags, int data, int outstanding);
849
850
851
852
853
854
855
856
857 void cancelComplete0(int res, int flags, short data) {
858
859 }
860
861
862
863
864
865
866
867
868 void connectComplete(int res, int flags, short data) {
869 ioState &= ~CONNECT_SCHEDULED;
870 freeRemoteAddressMemory();
871
872 if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
873
874 assert !socket.isBlocking();
875
876
877 schedulePollOut();
878 } else {
879 try {
880 if (res == 0) {
881 fulfillConnectPromise(connectPromise, active);
882 if (readPending) {
883 doBeginReadNow();
884 }
885 } else if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
886 try {
887 Errors.throwConnectException("io_uring connect", res);
888 } catch (Throwable cause) {
889 fulfillConnectPromise(connectPromise, cause);
890 }
891 }
892 } finally {
893
894
895
896 cancelConnectTimeoutFuture();
897 connectPromise = null;
898 }
899 }
900 }
901
902 @Override
903 public void connect(
904 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
905
906
907 if (promise.isDone() || !ensureOpen(promise)) {
908 return;
909 }
910
911 if (delayedClose != null) {
912 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
913 return;
914 }
915 try {
916 if (connectPromise != null) {
917 throw new ConnectionPendingException();
918 }
919 if (localAddress instanceof InetSocketAddress) {
920 checkResolvable((InetSocketAddress) localAddress);
921 }
922
923 if (remoteAddress instanceof InetSocketAddress) {
924 checkResolvable((InetSocketAddress) remoteAddress);
925 }
926
927 if (remote != null) {
928
929
930
931 throw new AlreadyConnectedException();
932 }
933
934 if (localAddress != null) {
935 socket.bind(localAddress);
936 }
937
938 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
939
940 ByteBuf initialData = null;
941 if (IoUring.isTcpFastOpenClientSideAvailable() &&
942 config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
943 ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
944 outbound.addFlush();
945 Object curr;
946 if ((curr = outbound.current()) instanceof ByteBuf) {
947 initialData = (ByteBuf) curr;
948 }
949 }
950 if (initialData != null) {
951 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
952 MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
953 hdr.write(socket, inetSocketAddress, initialData.memoryAddress(),
954 initialData.readableBytes(), (short) 0);
955
956 int fd = fd().intValue();
957 IoUringIoRegistration registration = registration();
958 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, 0, Native.MSG_FASTOPEN,
959 hdr.address(), hdr.idx());
960 connectId = registration.submit(ops);
961 } else {
962 submitConnect(inetSocketAddress);
963 }
964
965 ioState |= CONNECT_SCHEDULED;
966 } catch (Throwable t) {
967 closeIfClosed();
968 promise.tryFailure(annotateConnectException(t, remoteAddress));
969 return;
970 }
971 connectPromise = promise;
972 requestedRemoteAddress = remoteAddress;
973
974 int connectTimeoutMillis = config().getConnectTimeoutMillis();
975 if (connectTimeoutMillis > 0) {
976 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
977 @Override
978 public void run() {
979 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
980 if (connectPromise != null && !connectPromise.isDone() &&
981 connectPromise.tryFailure(new ConnectTimeoutException(
982 "connection timed out: " + remoteAddress))) {
983 close(voidPromise());
984 }
985 }
986 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
987 }
988
989 promise.addListener(new ChannelFutureListener() {
990 @Override
991 public void operationComplete(ChannelFuture future) {
992
993
994 if (future.isCancelled()) {
995 cancelConnectTimeoutFuture();
996 connectPromise = null;
997 close(voidPromise());
998 }
999 }
1000 });
1001 }
1002 }
1003
1004 private void submitConnect(InetSocketAddress inetSocketAddress) {
1005 remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1006 long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
1007
1008 SockaddrIn.write(socket.isIpv6(), remoteAddressMemoryAddress, inetSocketAddress);
1009
1010 int fd = fd().intValue();
1011 IoUringIoRegistration registration = registration();
1012 IoUringIoOps ops = IoUringIoOps.newConnect(
1013 fd, 0, remoteAddressMemoryAddress, nextOpsId());
1014 connectId = registration.submit(ops);
1015 }
1016
1017 @Override
1018 protected Object filterOutboundMessage(Object msg) {
1019 if (msg instanceof ByteBuf) {
1020 ByteBuf buf = (ByteBuf) msg;
1021 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1022 }
1023
1024 throw new UnsupportedOperationException("unsupported message type");
1025 }
1026
1027 @Override
1028 protected void doRegister(ChannelPromise promise) {
1029 IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1030 eventLoop.register(ioUringUnsafe()).addListener(f -> {
1031 if (f.isSuccess()) {
1032 registration = (IoUringIoRegistration) f.getNow();
1033 promise.setSuccess();
1034 } else {
1035 promise.setFailure(f.cause());
1036 }
1037 });
1038 }
1039
1040 @Override
1041 protected final void doDeregister() {
1042 if (registration == null) {
1043 return;
1044 }
1045 int fd = fd().intValue();
1046 if ((ioState & POLL_RDHUP_SCHEDULED) != 0) {
1047 registration.submit(IoUringIoOps.newPollRemove(
1048 fd, 0, pollRdhupId, (short) Native.POLLRDHUP));
1049 }
1050 if ((ioState & POLL_IN_SCHEDULED) != 0) {
1051 registration.submit(IoUringIoOps.newPollRemove(
1052 fd, 0, pollInId, (short) Native.POLLIN));
1053 }
1054 if ((ioState & POLL_OUT_SCHEDULED) != 0) {
1055 registration.submit(IoUringIoOps.newPollRemove(
1056 fd, 0, pollOutId, (short) Native.POLLOUT));
1057 }
1058
1059 registration = null;
1060 }
1061
1062 @Override
1063 protected void doBind(final SocketAddress local) throws Exception {
1064 if (local instanceof InetSocketAddress) {
1065 checkResolvable((InetSocketAddress) local);
1066 }
1067 socket.bind(local);
1068 this.local = socket.localAddress();
1069 }
1070
1071 protected static void checkResolvable(InetSocketAddress addr) {
1072 if (addr.isUnresolved()) {
1073 throw new UnresolvedAddressException();
1074 }
1075 }
1076
1077 @Override
1078 protected final SocketAddress localAddress0() {
1079 return local;
1080 }
1081
1082 @Override
1083 protected final SocketAddress remoteAddress0() {
1084 return remote;
1085 }
1086
1087 private static boolean isAllowHalfClosure(ChannelConfig config) {
1088 return config instanceof SocketChannelConfig &&
1089 ((SocketChannelConfig) config).isAllowHalfClosure();
1090 }
1091
1092 private void cancelConnectTimeoutFuture() {
1093 if (connectTimeoutFuture != null) {
1094 connectTimeoutFuture.cancel(false);
1095 connectTimeoutFuture = null;
1096 }
1097 }
1098
1099 private void computeRemote() {
1100 if (requestedRemoteAddress instanceof InetSocketAddress) {
1101 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1102 }
1103 }
1104
1105 private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1106 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1107 }
1108
1109 private void clearPollFlag(int pollMask) {
1110 if ((pollMask & Native.POLLIN) != 0) {
1111 ioState &= ~POLL_IN_SCHEDULED;
1112 }
1113 if ((pollMask & Native.POLLOUT) != 0) {
1114 ioState &= ~POLL_OUT_SCHEDULED;
1115 }
1116 if ((pollMask & Native.POLLRDHUP) != 0) {
1117 ioState &= ~POLL_RDHUP_SCHEDULED;
1118 }
1119 }
1120 }