1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufHolder;
21 import io.netty.buffer.ByteBufUtil;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractChannel;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelOption;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.socket.ChannelInputShutdownEvent;
38 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39 import io.netty.channel.socket.SocketChannelConfig;
40 import io.netty.channel.unix.Buffer;
41 import io.netty.channel.unix.Errors;
42 import io.netty.channel.unix.FileDescriptor;
43 import io.netty.channel.unix.UnixChannel;
44 import io.netty.channel.unix.UnixChannelUtil;
45 import io.netty.util.ReferenceCountUtil;
46 import io.netty.util.concurrent.PromiseNotifier;
47 import io.netty.util.internal.logging.InternalLogger;
48 import io.netty.util.internal.logging.InternalLoggerFactory;
49
50 import java.io.IOException;
51 import java.net.InetSocketAddress;
52 import java.net.SocketAddress;
53 import java.nio.ByteBuffer;
54 import java.nio.channels.AlreadyConnectedException;
55 import java.nio.channels.ClosedChannelException;
56 import java.nio.channels.ConnectionPendingException;
57 import java.nio.channels.NotYetConnectedException;
58 import java.nio.channels.UnresolvedAddressException;
59 import java.util.concurrent.ScheduledFuture;
60 import java.util.concurrent.TimeUnit;
61
62 import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
63 import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
64 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
65 import static io.netty.util.internal.ObjectUtil.checkNotNull;
66
67
68 abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
69 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
70 final LinuxSocket socket;
71 protected volatile boolean active;
72
73
74 private static final int POLL_IN_SCHEDULED = 1;
75 private static final int POLL_OUT_SCHEDULED = 1 << 2;
76 private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
77 private static final int WRITE_SCHEDULED = 1 << 4;
78 private static final int READ_SCHEDULED = 1 << 5;
79 private static final int CONNECT_SCHEDULED = 1 << 6;
80
81 private short opsId = Short.MIN_VALUE;
82
83 private long pollInId;
84 private long pollOutId;
85 private long pollRdhupId;
86 private long connectId;
87
88
89 private byte ioState;
90
91
92
93
94 private short numOutstandingWrites;
95 private short numOutstandingReads;
96
97 private boolean readPending;
98 private boolean inReadComplete;
99
100 private ChannelPromise delayedClose;
101 private boolean inputClosedSeenErrorOnRead;
102
103
104
105
106 private ChannelPromise connectPromise;
107 private ScheduledFuture<?> connectTimeoutFuture;
108 private SocketAddress requestedRemoteAddress;
109 private ByteBuffer remoteAddressMemory;
110 private MsgHdrMemoryArray msgHdrMemoryArray;
111
112 private IoUringIoRegistration registration;
113
114 private volatile SocketAddress local;
115 private volatile SocketAddress remote;
116
117 AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
118 super(parent);
119 this.socket = checkNotNull(socket, "fd");
120
121 if (active) {
122
123
124 this.active = true;
125 this.local = socket.localAddress();
126 this.remote = socket.remoteAddress();
127 }
128
129 if (parent != null) {
130 logger.trace("Create Channel Socket: {}", socket.intValue());
131 } else {
132 logger.trace("Create Server Socket: {}", socket.intValue());
133 }
134 }
135
136 AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
137 super(parent);
138 this.socket = checkNotNull(fd, "fd");
139 this.active = true;
140
141
142
143 this.remote = remote;
144 this.local = fd.localAddress();
145 }
146
147
148
149
150
151
152 protected final short nextOpsId() {
153 short id = opsId++;
154
155
156 if (id == 0) {
157 id = opsId++;
158 }
159 return id;
160 }
161
162 public final boolean isOpen() {
163 return socket.isOpen();
164 }
165
166 @Override
167 public boolean isActive() {
168 return active;
169 }
170
171 @Override
172 public final FileDescriptor fd() {
173 return socket;
174 }
175
176 private AbstractUringUnsafe ioUringUnsafe() {
177 return (AbstractUringUnsafe) unsafe();
178 }
179
180 @Override
181 protected boolean isCompatible(final EventLoop loop) {
182 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
183 }
184
185 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
186 return newDirectBuffer(buf, buf);
187 }
188
189 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
190 final int readableBytes = buf.readableBytes();
191 if (readableBytes == 0) {
192 ReferenceCountUtil.release(holder);
193 return Unpooled.EMPTY_BUFFER;
194 }
195
196 final ByteBufAllocator alloc = alloc();
197 if (alloc.isDirectBufferPooled()) {
198 return newDirectBuffer0(holder, buf, alloc, readableBytes);
199 }
200
201 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
202 if (directBuf == null) {
203 return newDirectBuffer0(holder, buf, alloc, readableBytes);
204 }
205
206 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
207 ReferenceCountUtil.safeRelease(holder);
208 return directBuf;
209 }
210
211 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
212 final ByteBuf directBuf = alloc.directBuffer(capacity);
213 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
214 ReferenceCountUtil.safeRelease(holder);
215 return directBuf;
216 }
217
218
219
220
221
222
223
224 protected abstract void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads);
225
226
227
228
229
230
231
232 protected abstract void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites);
233
234 @Override
235 protected void doDisconnect() throws Exception {
236 }
237
238 private void freeRemoteAddressMemory() {
239 if (remoteAddressMemory != null) {
240 Buffer.free(remoteAddressMemory);
241 remoteAddressMemory = null;
242 }
243 }
244
245 private void freeMsgHdrArray() {
246 if (msgHdrMemoryArray != null) {
247 msgHdrMemoryArray.release();
248 msgHdrMemoryArray = null;
249 }
250 }
251
252 @Override
253 protected void doClose() throws Exception {
254 active = false;
255
256 if (registration != null) {
257 if (socket.markClosed()) {
258 int fd = fd().intValue();
259 IoUringIoOps ops = IoUringIoOps.newClose(fd, 0, nextOpsId());
260 registration.submit(ops);
261 }
262 } else {
263
264 socket.close();
265 freeRemoteAddressMemory();
266 freeMsgHdrArray();
267 }
268 }
269
270 @Override
271 protected final void doBeginRead() {
272 if (inputClosedSeenErrorOnRead) {
273
274 return;
275 }
276 if (readPending) {
277
278 return;
279 }
280 if (inReadComplete) {
281
282
283
284 readPending = true;
285 return;
286 }
287 doBeginReadNow();
288 }
289
290 private void doBeginReadNow() {
291 if (socket.isBlocking()) {
292
293 ioUringUnsafe().scheduleFirstReadIfNeeded();
294 } else {
295 if ((ioState & POLL_IN_SCHEDULED) == 0) {
296 ioUringUnsafe().schedulePollIn();
297 }
298 }
299 }
300
301 @Override
302 protected void doWrite(ChannelOutboundBuffer in) {
303 if ((ioState & WRITE_SCHEDULED) != 0) {
304 return;
305 }
306 if (scheduleWrite(in) > 0) {
307 ioState |= WRITE_SCHEDULED;
308 }
309 }
310
311 private int scheduleWrite(ChannelOutboundBuffer in) {
312 if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
313 return 0;
314 }
315 if (in == null) {
316 return 0;
317 }
318
319 int msgCount = in.size();
320 if (msgCount == 0) {
321 return 0;
322 }
323 Object msg = in.current();
324
325 if (msgCount > 1) {
326 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
327 } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
328 (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
329
330 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
331 } else {
332 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
333 }
334
335 assert numOutstandingWrites > 0;
336 return numOutstandingWrites;
337 }
338
339 protected final IoUringIoRegistration registration() {
340 assert registration != null;
341 return registration;
342 }
343
344 private void schedulePollOut() {
345
346 assert !socket.isBlocking();
347
348 assert (ioState & POLL_OUT_SCHEDULED) == 0;
349 int fd = fd().intValue();
350 IoUringIoRegistration registration = registration();
351 IoUringIoOps ops = IoUringIoOps.newPollAdd(
352 fd, 0, Native.POLLOUT, (short) Native.POLLOUT);
353 pollOutId = registration.submit(ops);
354 ioState |= POLL_OUT_SCHEDULED;
355 }
356
357 final void schedulePollRdHup() {
358 assert (ioState & POLL_RDHUP_SCHEDULED) == 0;
359 int fd = fd().intValue();
360 IoUringIoRegistration registration = registration();
361 IoUringIoOps ops = IoUringIoOps.newPollAdd(
362 fd, 0, Native.POLLRDHUP, (short) Native.POLLRDHUP);
363 pollRdhupId = registration.submit(ops);
364 ioState |= POLL_RDHUP_SCHEDULED;
365 }
366
367 final void resetCachedAddresses() {
368 local = socket.localAddress();
369 remote = socket.remoteAddress();
370 }
371
372 abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
373 private IoUringRecvByteAllocatorHandle allocHandle;
374
375
376
377
378
379 protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
380
381
382
383
384
385 protected abstract int scheduleWriteSingle(Object msg);
386
387 private boolean closed;
388
389 @Override
390 public final void handle(IoRegistration registration, IoEvent ioEvent) {
391 IoUringIoRegistration reg = (IoUringIoRegistration) registration;
392 IoUringIoEvent event = (IoUringIoEvent) ioEvent;
393 byte op = event.opcode();
394 int res = event.res();
395 int flags = event.flags();
396 short data = event.data();
397 switch (op) {
398 case Native.IORING_OP_RECV:
399 case Native.IORING_OP_ACCEPT:
400 case Native.IORING_OP_RECVMSG:
401 case Native.IORING_OP_READ:
402 readComplete(res, flags, data);
403
404
405
406 handleDelayedClosed();
407 break;
408 case Native.IORING_OP_WRITEV:
409 case Native.IORING_OP_SEND:
410 case Native.IORING_OP_SENDMSG:
411 case Native.IORING_OP_WRITE:
412 writeComplete(res, flags, data);
413
414
415
416 handleDelayedClosed();
417 break;
418 case Native.IORING_OP_POLL_ADD:
419 pollAddComplete(res, flags, data);
420 break;
421 case Native.IORING_OP_POLL_REMOVE:
422
423
424 if (res == 0) {
425 clearPollFlag(data);
426 }
427 break;
428 case Native.IORING_OP_ASYNC_CANCEL:
429 cancelComplete0(res, flags, data);
430 break;
431 case Native.IORING_OP_CONNECT:
432 connectComplete(res, flags, data);
433
434
435 freeMsgHdrArray();
436 freeRemoteAddressMemory();
437 break;
438 case Native.IORING_OP_CLOSE:
439 if (delayedClose != null) {
440 delayedClose.setSuccess();
441 }
442 closed = true;
443 break;
444 }
445
446 if (ioState == 0 && closed) {
447 freeResourcesNow(reg);
448 }
449 }
450
451
452
453
454
455
456 protected void freeResourcesNow(IoUringIoRegistration reg) {
457 freeMsgHdrArray();
458 freeRemoteAddressMemory();
459 reg.cancel();
460 }
461
462 private void handleDelayedClosed() {
463 if (delayedClose != null && canCloseNow()) {
464 closeNow(newPromise());
465 }
466 }
467
468 private void pollAddComplete(int res, int flags, short data) {
469 if ((data & Native.POLLOUT) != 0) {
470 pollOut(res);
471 }
472 if ((data & Native.POLLIN) != 0) {
473 pollIn(res);
474 }
475 if ((data & Native.POLLRDHUP) != 0) {
476 pollRdHup(res);
477 }
478 }
479
480 @Override
481 public final void close() throws Exception {
482 close(voidPromise());
483 }
484
485 @Override
486 public final void close(ChannelPromise promise) {
487 if (delayedClose == null) {
488
489
490
491 delayedClose = promise.isVoid() ? newPromise() : promise;
492 } else {
493 delayedClose.addListener(new PromiseNotifier<>(false, promise));
494 return;
495 }
496
497 boolean cancelConnect = false;
498 try {
499 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
500 if (connectPromise != null) {
501
502 connectPromise.tryFailure(new ClosedChannelException());
503 AbstractIoUringChannel.this.connectPromise = null;
504 cancelConnect = true;
505 }
506
507 cancelConnectTimeoutFuture();
508 } finally {
509
510
511 if (registration != null) {
512 if (cancelConnect && connectId != 0) {
513 int fd = fd().intValue();
514
515 registration.submit(IoUringIoOps.newAsyncCancel(
516 fd, 0, connectId, Native.IORING_OP_CONNECT));
517 }
518 cancelOutstandingReads(registration, numOutstandingReads);
519 cancelOutstandingWrites(registration, numOutstandingWrites);
520 }
521 }
522
523 if (canCloseNow()) {
524
525 closeNow(newPromise());
526 }
527 }
528
529 private boolean canCloseNow() {
530
531
532 return (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
533 }
534
535 private void closeNow(ChannelPromise promise) {
536 super.close(promise);
537 }
538
539 @Override
540 protected final void flush0() {
541
542
543
544 if ((ioState & POLL_OUT_SCHEDULED) == 0) {
545 super.flush0();
546 }
547 }
548
549 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
550 if (promise == null) {
551
552 return;
553 }
554
555
556 promise.tryFailure(cause);
557 closeIfClosed();
558 }
559
560 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
561 if (promise == null) {
562
563 return;
564 }
565 active = true;
566
567 if (local == null) {
568 local = socket.localAddress();
569 }
570 computeRemote();
571
572
573 schedulePollRdHup();
574
575
576
577 boolean active = isActive();
578
579
580 boolean promiseSet = promise.trySuccess();
581
582
583
584 if (!wasActive && active) {
585 pipeline().fireChannelActive();
586 }
587
588
589 if (!promiseSet) {
590 close(voidPromise());
591 }
592 }
593
594 @Override
595 public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
596 if (allocHandle == null) {
597 allocHandle = new IoUringRecvByteAllocatorHandle(
598 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
599 }
600 return allocHandle;
601 }
602
603 final void shutdownInput(boolean rdHup) {
604 logger.trace("shutdownInput Fd: {}", fd().intValue());
605 if (!socket.isInputShutdown()) {
606 if (isAllowHalfClosure(config())) {
607 try {
608 socket.shutdown(true, false);
609 } catch (IOException ignored) {
610
611
612 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
613 return;
614 } catch (NotYetConnectedException ignore) {
615
616
617 }
618 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
619 } else {
620 close(voidPromise());
621 }
622 } else if (!rdHup) {
623 inputClosedSeenErrorOnRead = true;
624 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
625 }
626 }
627
628 private void fireEventAndClose(Object evt) {
629 pipeline().fireUserEventTriggered(evt);
630 close(voidPromise());
631 }
632
633 final void schedulePollIn() {
634
635 assert !socket.isBlocking();
636
637 assert (ioState & POLL_IN_SCHEDULED) == 0;
638 if (!isActive() || shouldBreakIoUringInReady(config())) {
639 return;
640 }
641 int fd = fd().intValue();
642 IoUringIoRegistration registration = registration();
643 IoUringIoOps ops = IoUringIoOps.newPollAdd(
644 fd, 0, Native.POLLIN, (short) Native.POLLIN);
645 pollInId = registration.submit(ops);
646 ioState |= POLL_IN_SCHEDULED;
647 }
648
649 private void readComplete(int res, int flags, int data) {
650 assert numOutstandingReads > 0;
651 if (--numOutstandingReads == 0) {
652 readPending = false;
653 ioState &= ~READ_SCHEDULED;
654 }
655 inReadComplete = true;
656 try {
657 readComplete0(res, flags, data, numOutstandingReads);
658 } finally {
659 inReadComplete = false;
660
661
662 if (readPending && recvBufAllocHandle().isReadComplete()) {
663 doBeginReadNow();
664 }
665 }
666 }
667
668
669
670
671 protected abstract void readComplete0(int res, int flags, int data, int outstandingCompletes);
672
673
674
675
676 private void pollRdHup(int res) {
677 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
678 return;
679 }
680 ioState &= ~POLL_RDHUP_SCHEDULED;
681
682
683 recvBufAllocHandle().rdHupReceived();
684
685 if (isActive()) {
686 scheduleFirstReadIfNeeded();
687 } else {
688
689 shutdownInput(true);
690 }
691 }
692
693
694
695
696 private void pollIn(int res) {
697 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
698 return;
699 }
700 ioState &= ~POLL_IN_SCHEDULED;
701
702 scheduleFirstReadIfNeeded();
703 }
704
705 private void scheduleFirstReadIfNeeded() {
706 if ((ioState & READ_SCHEDULED) == 0) {
707 scheduleFirstRead();
708 }
709 }
710
711 private void scheduleFirstRead() {
712
713 final ChannelConfig config = config();
714 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
715 allocHandle.reset(config);
716 scheduleRead(true);
717 }
718
719 protected final void scheduleRead(boolean first) {
720
721 if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
722 numOutstandingReads = (short) scheduleRead0(first);
723 if (numOutstandingReads > 0) {
724 ioState |= READ_SCHEDULED;
725 }
726 }
727 }
728
729
730
731
732
733
734
735 protected abstract int scheduleRead0(boolean first);
736
737
738
739
740
741
742 private void pollOut(int res) {
743 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
744 return;
745 }
746 ioState &= ~POLL_OUT_SCHEDULED;
747
748 if (connectPromise != null) {
749
750
751
752 assert eventLoop().inEventLoop();
753
754 boolean connectStillInProgress = false;
755 try {
756 boolean wasActive = isActive();
757 if (!socket.finishConnect()) {
758 connectStillInProgress = true;
759 return;
760 }
761 fulfillConnectPromise(connectPromise, wasActive);
762 } catch (Throwable t) {
763 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
764 } finally {
765 if (!connectStillInProgress) {
766
767
768
769 cancelConnectTimeoutFuture();
770 connectPromise = null;
771 } else {
772
773 assert !socket.isBlocking();
774
775
776 schedulePollOut();
777 }
778 }
779 } else if (!socket.isOutputShutdown()) {
780
781 super.flush0();
782 }
783 }
784
785
786
787
788
789
790
791
792 private void writeComplete(int res, int flags, short data) {
793 if ((ioState & CONNECT_SCHEDULED) != 0) {
794
795
796 freeMsgHdrArray();
797 if (res > 0) {
798
799 outboundBuffer().removeBytes(res);
800
801
802 connectComplete(0, flags, data);
803 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
804
805
806
807
808 submitConnect((InetSocketAddress) requestedRemoteAddress);
809 } else {
810
811 connectComplete(res, flags, data);
812 }
813 return;
814 }
815 assert numOutstandingWrites > 0;
816 --numOutstandingWrites;
817
818 boolean writtenAll = writeComplete0(res, flags, data, numOutstandingWrites);
819 if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
820
821 assert !socket.isBlocking();
822
823
824 schedulePollOut();
825 }
826
827
828
829 if (numOutstandingWrites == 0) {
830 ioState &= ~WRITE_SCHEDULED;
831
832
833 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
834 doWrite(unsafe().outboundBuffer());
835 }
836 }
837 }
838
839
840
841
842
843
844
845
846
847 abstract boolean writeComplete0(int res, int flags, int data, int outstanding);
848
849
850
851
852
853
854
855
856 void cancelComplete0(int res, int flags, short data) {
857
858 }
859
860
861
862
863
864
865
866
867 void connectComplete(int res, int flags, short data) {
868 ioState &= ~CONNECT_SCHEDULED;
869 freeRemoteAddressMemory();
870
871 if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
872
873 assert !socket.isBlocking();
874
875
876 schedulePollOut();
877 } else {
878 try {
879 if (res == 0) {
880 fulfillConnectPromise(connectPromise, active);
881 } else if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
882 try {
883 Errors.throwConnectException("io_uring connect", res);
884 } catch (Throwable cause) {
885 fulfillConnectPromise(connectPromise, cause);
886 }
887 }
888 } finally {
889
890
891
892 cancelConnectTimeoutFuture();
893 connectPromise = null;
894 }
895 }
896 }
897
898 @Override
899 public void connect(
900 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
901
902
903 if (promise.isDone() || !ensureOpen(promise)) {
904 return;
905 }
906
907 if (delayedClose != null) {
908 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
909 return;
910 }
911 try {
912 if (connectPromise != null) {
913 throw new ConnectionPendingException();
914 }
915 if (localAddress instanceof InetSocketAddress) {
916 checkResolvable((InetSocketAddress) localAddress);
917 }
918
919 if (remoteAddress instanceof InetSocketAddress) {
920 checkResolvable((InetSocketAddress) remoteAddress);
921 }
922
923 if (remote != null) {
924
925
926
927 throw new AlreadyConnectedException();
928 }
929
930 if (localAddress != null) {
931 socket.bind(localAddress);
932 }
933
934 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
935
936 ByteBuf initialData = null;
937 if (IoUring.isTcpFastOpenClientSideAvailable() &&
938 config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
939 ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
940 outbound.addFlush();
941 Object curr;
942 if ((curr = outbound.current()) instanceof ByteBuf) {
943 initialData = (ByteBuf) curr;
944 }
945 }
946 if (initialData != null) {
947 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
948 MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
949 hdr.write(socket, inetSocketAddress, initialData.memoryAddress(),
950 initialData.readableBytes(), (short) 0);
951
952 int fd = fd().intValue();
953 IoUringIoRegistration registration = registration();
954 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, 0, Native.MSG_FASTOPEN,
955 hdr.address(), hdr.idx());
956 connectId = registration.submit(ops);
957 } else {
958 submitConnect(inetSocketAddress);
959 }
960
961 ioState |= CONNECT_SCHEDULED;
962 } catch (Throwable t) {
963 closeIfClosed();
964 promise.tryFailure(annotateConnectException(t, remoteAddress));
965 return;
966 }
967 connectPromise = promise;
968 requestedRemoteAddress = remoteAddress;
969
970 int connectTimeoutMillis = config().getConnectTimeoutMillis();
971 if (connectTimeoutMillis > 0) {
972 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
973 @Override
974 public void run() {
975 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
976 if (connectPromise != null && !connectPromise.isDone() &&
977 connectPromise.tryFailure(new ConnectTimeoutException(
978 "connection timed out: " + remoteAddress))) {
979 close(voidPromise());
980 }
981 }
982 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
983 }
984
985 promise.addListener(new ChannelFutureListener() {
986 @Override
987 public void operationComplete(ChannelFuture future) {
988
989
990 if (future.isCancelled()) {
991 cancelConnectTimeoutFuture();
992 connectPromise = null;
993 close(voidPromise());
994 }
995 }
996 });
997 }
998 }
999
1000 private void submitConnect(InetSocketAddress inetSocketAddress) {
1001 remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1002 long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
1003
1004 SockaddrIn.write(socket.isIpv6(), remoteAddressMemoryAddress, inetSocketAddress);
1005
1006 int fd = fd().intValue();
1007 IoUringIoRegistration registration = registration();
1008 IoUringIoOps ops = IoUringIoOps.newConnect(
1009 fd, 0, remoteAddressMemoryAddress, nextOpsId());
1010 connectId = registration.submit(ops);
1011 }
1012
1013 @Override
1014 protected Object filterOutboundMessage(Object msg) {
1015 if (msg instanceof ByteBuf) {
1016 ByteBuf buf = (ByteBuf) msg;
1017 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1018 }
1019
1020 throw new UnsupportedOperationException("unsupported message type");
1021 }
1022
1023 @Override
1024 protected void doRegister(ChannelPromise promise) {
1025 IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1026 eventLoop.register(ioUringUnsafe()).addListener(f -> {
1027 if (f.isSuccess()) {
1028 registration = (IoUringIoRegistration) f.getNow();
1029 promise.setSuccess();
1030 } else {
1031 promise.setFailure(f.cause());
1032 }
1033 });
1034 }
1035
1036 @Override
1037 protected final void doDeregister() {
1038 if (registration == null) {
1039 return;
1040 }
1041 int fd = fd().intValue();
1042 if ((ioState & POLL_RDHUP_SCHEDULED) != 0) {
1043 registration.submit(IoUringIoOps.newPollRemove(
1044 fd, 0, pollRdhupId, (short) Native.POLLRDHUP));
1045 }
1046 if ((ioState & POLL_IN_SCHEDULED) != 0) {
1047 registration.submit(IoUringIoOps.newPollRemove(
1048 fd, 0, pollInId, (short) Native.POLLIN));
1049 }
1050 if ((ioState & POLL_OUT_SCHEDULED) != 0) {
1051 registration.submit(IoUringIoOps.newPollRemove(
1052 fd, 0, pollOutId, (short) Native.POLLOUT));
1053 }
1054
1055 registration = null;
1056 }
1057
1058 @Override
1059 protected void doBind(final SocketAddress local) throws Exception {
1060 if (local instanceof InetSocketAddress) {
1061 checkResolvable((InetSocketAddress) local);
1062 }
1063 socket.bind(local);
1064 this.local = socket.localAddress();
1065 }
1066
1067 protected static void checkResolvable(InetSocketAddress addr) {
1068 if (addr.isUnresolved()) {
1069 throw new UnresolvedAddressException();
1070 }
1071 }
1072
1073 @Override
1074 protected final SocketAddress localAddress0() {
1075 return local;
1076 }
1077
1078 @Override
1079 protected final SocketAddress remoteAddress0() {
1080 return remote;
1081 }
1082
1083 private static boolean isAllowHalfClosure(ChannelConfig config) {
1084 return config instanceof SocketChannelConfig &&
1085 ((SocketChannelConfig) config).isAllowHalfClosure();
1086 }
1087
1088 private void cancelConnectTimeoutFuture() {
1089 if (connectTimeoutFuture != null) {
1090 connectTimeoutFuture.cancel(false);
1091 connectTimeoutFuture = null;
1092 }
1093 }
1094
1095 private void computeRemote() {
1096 if (requestedRemoteAddress instanceof InetSocketAddress) {
1097 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1098 }
1099 }
1100
1101 private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1102 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1103 }
1104
1105 private void clearPollFlag(int pollMask) {
1106 if ((pollMask & Native.POLLIN) != 0) {
1107 ioState &= ~POLL_IN_SCHEDULED;
1108 }
1109 if ((pollMask & Native.POLLOUT) != 0) {
1110 ioState &= ~POLL_OUT_SCHEDULED;
1111 }
1112 if ((pollMask & Native.POLLRDHUP) != 0) {
1113 ioState &= ~POLL_RDHUP_SCHEDULED;
1114 }
1115 }
1116 }