1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufHolder;
21 import io.netty.buffer.ByteBufUtil;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractChannel;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelOption;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.ServerChannel;
38 import io.netty.channel.socket.ChannelInputShutdownEvent;
39 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40 import io.netty.channel.socket.SocketChannelConfig;
41 import io.netty.channel.unix.Buffer;
42 import io.netty.channel.unix.Errors;
43 import io.netty.channel.unix.FileDescriptor;
44 import io.netty.channel.unix.UnixChannel;
45 import io.netty.channel.unix.UnixChannelUtil;
46 import io.netty.util.ReferenceCountUtil;
47 import io.netty.util.internal.logging.InternalLogger;
48 import io.netty.util.internal.logging.InternalLoggerFactory;
49
50 import java.io.IOException;
51 import java.net.InetSocketAddress;
52 import java.net.SocketAddress;
53 import java.nio.ByteBuffer;
54 import java.nio.channels.AlreadyConnectedException;
55 import java.nio.channels.ClosedChannelException;
56 import java.nio.channels.ConnectionPendingException;
57 import java.nio.channels.NotYetConnectedException;
58 import java.nio.channels.UnresolvedAddressException;
59 import java.util.concurrent.ScheduledFuture;
60 import java.util.concurrent.TimeUnit;
61
62 import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
63 import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
64 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
65 import static io.netty.util.internal.ObjectUtil.checkNotNull;
66
67
68 abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
69 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
70 final LinuxSocket socket;
71 protected volatile boolean active;
72
73
74 private static final int POLL_IN_SCHEDULED = 1;
75 private static final int POLL_OUT_SCHEDULED = 1 << 2;
76 private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
77 private static final int WRITE_SCHEDULED = 1 << 4;
78 private static final int READ_SCHEDULED = 1 << 5;
79 private static final int CONNECT_SCHEDULED = 1 << 6;
80
81 private short opsId = Short.MIN_VALUE;
82
83 private long pollInId;
84 private long pollOutId;
85 private long pollRdhupId;
86 private long connectId;
87
88
89 private byte ioState;
90
91
92
93
94 private short numOutstandingWrites;
95 private short numOutstandingReads;
96
97 private boolean readPending;
98 private boolean inReadComplete;
99
100 private boolean inputClosedSeenErrorOnRead;
101
102
103
104
105 private ChannelPromise connectPromise;
106 private ScheduledFuture<?> connectTimeoutFuture;
107 private SocketAddress requestedRemoteAddress;
108 private ByteBuffer remoteAddressMemory;
109 private MsgHdrMemoryArray msgHdrMemoryArray;
110
111 private IoUringIoRegistration registration;
112
113 private volatile SocketAddress local;
114 private volatile SocketAddress remote;
115
116 AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
117 super(parent);
118 this.socket = checkNotNull(socket, "fd");
119
120 if (active) {
121
122
123 this.active = true;
124 this.local = socket.localAddress();
125 this.remote = socket.remoteAddress();
126 }
127
128 logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
129 }
130
131 AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
132 super(parent);
133 this.socket = checkNotNull(fd, "fd");
134 this.active = true;
135
136
137
138 this.remote = remote;
139 this.local = fd.localAddress();
140 }
141
142
143
144
145
146
147 protected final short nextOpsId() {
148 short id = opsId++;
149
150
151 if (id == 0) {
152 id = opsId++;
153 }
154 return id;
155 }
156
157 public final boolean isOpen() {
158 return socket.isOpen();
159 }
160
161 @Override
162 public boolean isActive() {
163 return active;
164 }
165
166 @Override
167 public final FileDescriptor fd() {
168 return socket;
169 }
170
171 private AbstractUringUnsafe ioUringUnsafe() {
172 return (AbstractUringUnsafe) unsafe();
173 }
174
175 @Override
176 protected boolean isCompatible(final EventLoop loop) {
177 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
178 }
179
180 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
181 return newDirectBuffer(buf, buf);
182 }
183
184 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
185 final int readableBytes = buf.readableBytes();
186 if (readableBytes == 0) {
187 ReferenceCountUtil.release(holder);
188 return Unpooled.EMPTY_BUFFER;
189 }
190
191 final ByteBufAllocator alloc = alloc();
192 if (alloc.isDirectBufferPooled()) {
193 return newDirectBuffer0(holder, buf, alloc, readableBytes);
194 }
195
196 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
197 if (directBuf == null) {
198 return newDirectBuffer0(holder, buf, alloc, readableBytes);
199 }
200
201 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
202 ReferenceCountUtil.safeRelease(holder);
203 return directBuf;
204 }
205
206 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
207 final ByteBuf directBuf = alloc.directBuffer(capacity);
208 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
209 ReferenceCountUtil.safeRelease(holder);
210 return directBuf;
211 }
212
213
214
215
216
217
218
219 protected abstract void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads);
220
221
222
223
224
225
226
227 protected abstract void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites);
228
229 @Override
230 protected void doDisconnect() throws Exception {
231 }
232
233 private void freeRemoteAddressMemory() {
234 if (remoteAddressMemory != null) {
235 Buffer.free(remoteAddressMemory);
236 remoteAddressMemory = null;
237 }
238 }
239
240 private void freeMsgHdrArray() {
241 if (msgHdrMemoryArray != null) {
242 msgHdrMemoryArray.release();
243 msgHdrMemoryArray = null;
244 }
245 }
246
247 @Override
248 protected void doClose() throws Exception {
249 active = false;
250
251 if (registration != null && registration.isValid()) {
252 if (socket.markClosed()) {
253 boolean cancelConnect = false;
254 try {
255 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
256 if (connectPromise != null) {
257
258 connectPromise.tryFailure(new ClosedChannelException());
259 AbstractIoUringChannel.this.connectPromise = null;
260 cancelConnect = true;
261 }
262
263 cancelConnectTimeoutFuture();
264 } finally {
265
266
267 ioUringUnsafe().cancelOps(cancelConnect);
268 }
269
270 int fd = fd().intValue();
271 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
272 registration.submit(ops);
273 }
274 } else {
275
276 socket.close();
277 ioUringUnsafe().freeResourcesNowIfNeeded(null);
278 }
279 }
280
281 @Override
282 protected final void doBeginRead() {
283 if (inputClosedSeenErrorOnRead) {
284
285 return;
286 }
287 if (readPending) {
288
289 return;
290 }
291 if (inReadComplete || !isActive()) {
292
293
294
295 readPending = true;
296 return;
297 }
298 doBeginReadNow();
299 }
300
301 private void doBeginReadNow() {
302 if (socket.isBlocking()) {
303
304 ioUringUnsafe().scheduleFirstReadIfNeeded();
305 } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
306 ioUringUnsafe().schedulePollIn();
307 }
308 }
309
310 @Override
311 protected void doWrite(ChannelOutboundBuffer in) {
312 if ((ioState & WRITE_SCHEDULED) != 0) {
313 return;
314 }
315 if (scheduleWrite(in) > 0) {
316 ioState |= WRITE_SCHEDULED;
317 }
318 }
319
320 private int scheduleWrite(ChannelOutboundBuffer in) {
321 if (numOutstandingWrites == Short.MAX_VALUE) {
322 return 0;
323 }
324 if (in == null) {
325 return 0;
326 }
327
328 int msgCount = in.size();
329 if (msgCount == 0) {
330 return 0;
331 }
332 Object msg = in.current();
333
334 if (msgCount > 1 && in.current() instanceof ByteBuf) {
335 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
336 } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
337 (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
338
339 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
340 } else {
341 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
342 }
343
344 assert numOutstandingWrites > 0;
345 return numOutstandingWrites;
346 }
347
348 protected final IoUringIoRegistration registration() {
349 assert registration != null;
350 return registration;
351 }
352
353 private void schedulePollOut() {
354
355 assert !socket.isBlocking();
356 pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT);
357 }
358
359 final void schedulePollRdHup() {
360 pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP);
361 }
362
363 private long schedulePollAdd(int ioMask, int mask) {
364 assert (ioState & ioMask) == 0;
365 int fd = fd().intValue();
366 IoUringIoRegistration registration = registration();
367 IoUringIoOps ops = IoUringIoOps.newPollAdd(
368 fd, (byte) 0, mask, (short) mask);
369 long id = registration.submit(ops);
370 if (id != 0) {
371 ioState |= ioMask;
372 }
373 return id;
374 }
375
376 final void resetCachedAddresses() {
377 local = socket.localAddress();
378 remote = socket.remoteAddress();
379 }
380
381 abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
382 private IoUringRecvByteAllocatorHandle allocHandle;
383
384
385
386
387
388 protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
389
390
391
392
393
394 protected abstract int scheduleWriteSingle(Object msg);
395
396 private boolean closed;
397 private boolean freed;
398
399 @Override
400 public final void handle(IoRegistration registration, IoEvent ioEvent) {
401 IoUringIoRegistration reg = (IoUringIoRegistration) registration;
402 IoUringIoEvent event = (IoUringIoEvent) ioEvent;
403 byte op = event.opcode();
404 int res = event.res();
405 int flags = event.flags();
406 short data = event.data();
407 switch (op) {
408 case Native.IORING_OP_RECV:
409 case Native.IORING_OP_ACCEPT:
410 case Native.IORING_OP_RECVMSG:
411 case Native.IORING_OP_READ:
412 readComplete(op, res, flags, data);
413 break;
414 case Native.IORING_OP_WRITEV:
415 case Native.IORING_OP_SEND:
416 case Native.IORING_OP_SENDMSG:
417 case Native.IORING_OP_WRITE:
418 case Native.IORING_OP_SPLICE:
419 writeComplete(op, res, flags, data);
420 break;
421 case Native.IORING_OP_POLL_ADD:
422 pollAddComplete(res, flags, data);
423 break;
424 case Native.IORING_OP_POLL_REMOVE:
425
426
427 if (res == 0) {
428 clearPollFlag(data);
429 }
430 break;
431 case Native.IORING_OP_ASYNC_CANCEL:
432 cancelComplete0(op, res, flags, data);
433 break;
434 case Native.IORING_OP_CONNECT:
435 connectComplete(op, res, flags, data);
436
437
438 freeMsgHdrArray();
439 freeRemoteAddressMemory();
440 break;
441 case Native.IORING_OP_CLOSE:
442 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
443 closed = true;
444 }
445 break;
446 }
447
448 if (ioState == 0 && closed) {
449 freeResourcesNowIfNeeded(reg);
450 }
451 }
452
453 private void freeResourcesNowIfNeeded(IoUringIoRegistration reg) {
454 if (!freed) {
455 freed = true;
456 freeResourcesNow(reg);
457 }
458 }
459
460
461
462
463
464
465 protected void freeResourcesNow(IoUringIoRegistration reg) {
466 freeMsgHdrArray();
467 freeRemoteAddressMemory();
468 if (reg != null) {
469 reg.cancel();
470 }
471 }
472
473 private void pollAddComplete(int res, int flags, short data) {
474 if ((data & Native.POLLOUT) != 0) {
475 pollOut(res);
476 }
477 if ((data & Native.POLLIN) != 0) {
478 pollIn(res);
479 }
480 if ((data & Native.POLLRDHUP) != 0) {
481 pollRdHup(res);
482 }
483 }
484
485 @Override
486 public final void close() throws Exception {
487 close(voidPromise());
488 }
489
490 private void cancelOps(boolean cancelConnect) {
491 if (registration == null || !registration.isValid()) {
492 return;
493 }
494 int fd = fd().intValue();
495 if ((ioState & POLL_RDHUP_SCHEDULED) != 0) {
496 registration.submit(IoUringIoOps.newPollRemove(
497 fd, (byte) 0, pollRdhupId, (short) Native.POLLRDHUP));
498 }
499 if ((ioState & POLL_IN_SCHEDULED) != 0) {
500 registration.submit(IoUringIoOps.newPollRemove(
501 fd, (byte) 0, pollInId, (short) Native.POLLIN));
502 }
503 if ((ioState & POLL_OUT_SCHEDULED) != 0) {
504 registration.submit(IoUringIoOps.newPollRemove(
505 fd, (byte) 0, pollOutId, (short) Native.POLLOUT));
506 }
507 if (cancelConnect && connectId != 0) {
508
509 registration.submit(IoUringIoOps.newAsyncCancel(
510 fd, (byte) 0, connectId, Native.IORING_OP_CONNECT));
511 }
512 cancelOutstandingReads(registration, numOutstandingReads);
513 cancelOutstandingWrites(registration, numOutstandingWrites);
514 }
515
516 @Override
517 protected final void flush0() {
518
519
520
521 if ((ioState & POLL_OUT_SCHEDULED) == 0) {
522 super.flush0();
523 }
524 }
525
526 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
527 if (promise == null) {
528
529 return;
530 }
531
532
533 promise.tryFailure(cause);
534 closeIfClosed();
535 }
536
537 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
538 if (promise == null) {
539
540 return;
541 }
542 active = true;
543
544 if (local == null) {
545 local = socket.localAddress();
546 }
547 computeRemote();
548
549
550 schedulePollRdHup();
551
552
553
554 boolean active = isActive();
555
556
557 boolean promiseSet = promise.trySuccess();
558
559
560
561 if (!wasActive && active) {
562 pipeline().fireChannelActive();
563 }
564
565
566 if (!promiseSet) {
567 close(voidPromise());
568 }
569 }
570
571 @Override
572 public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
573 if (allocHandle == null) {
574 allocHandle = new IoUringRecvByteAllocatorHandle(
575 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
576 }
577 return allocHandle;
578 }
579
580 final void shutdownInput(boolean allDataRead) {
581 logger.trace("shutdownInput Fd: {}", fd().intValue());
582 if (!socket.isInputShutdown()) {
583 if (isAllowHalfClosure(config())) {
584 try {
585 socket.shutdown(true, false);
586 } catch (IOException ignored) {
587
588
589 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
590 return;
591 } catch (NotYetConnectedException ignore) {
592
593
594 }
595 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
596 } else {
597 close(voidPromise());
598 return;
599 }
600 }
601 if (allDataRead && !inputClosedSeenErrorOnRead) {
602 inputClosedSeenErrorOnRead = true;
603 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
604 }
605 }
606
607 private void fireEventAndClose(Object evt) {
608 pipeline().fireUserEventTriggered(evt);
609 close(voidPromise());
610 }
611
612 final void schedulePollIn() {
613
614 assert !socket.isBlocking();
615
616 assert (ioState & POLL_IN_SCHEDULED) == 0;
617 if (!isActive() || shouldBreakIoUringInReady(config())) {
618 return;
619 }
620 pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN);
621 }
622
623 private void readComplete(byte op, int res, int flags, short data) {
624 assert numOutstandingReads > 0;
625 if (--numOutstandingReads == 0) {
626 readPending = false;
627 ioState &= ~READ_SCHEDULED;
628 }
629 inReadComplete = true;
630 try {
631 readComplete0(op, res, flags, data, numOutstandingReads);
632 } finally {
633 inReadComplete = false;
634
635
636 if (readPending && recvBufAllocHandle().isReadComplete()) {
637 doBeginReadNow();
638 }
639 }
640 }
641
642
643
644
645 protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
646
647
648
649
650 private void pollRdHup(int res) {
651 ioState &= ~POLL_RDHUP_SCHEDULED;
652 pollRdhupId = 0;
653 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
654 return;
655 }
656
657
658 recvBufAllocHandle().rdHupReceived();
659
660 if (isActive()) {
661 scheduleFirstReadIfNeeded();
662 } else {
663
664 shutdownInput(false);
665 }
666 }
667
668
669
670
671 private void pollIn(int res) {
672 ioState &= ~POLL_IN_SCHEDULED;
673 pollInId = 0;
674 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
675 return;
676 }
677
678 scheduleFirstReadIfNeeded();
679 }
680
681 private void scheduleFirstReadIfNeeded() {
682 if ((ioState & READ_SCHEDULED) == 0) {
683 scheduleFirstRead();
684 }
685 }
686
687 private void scheduleFirstRead() {
688
689 final ChannelConfig config = config();
690 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
691 allocHandle.reset(config);
692 scheduleRead(true);
693 }
694
695 protected final void scheduleRead(boolean first) {
696
697 if (fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
698 numOutstandingReads = (short) scheduleRead0(first);
699 if (numOutstandingReads > 0) {
700 ioState |= READ_SCHEDULED;
701 }
702 }
703 }
704
705
706
707
708
709
710
711 protected abstract int scheduleRead0(boolean first);
712
713
714
715
716
717
718 private void pollOut(int res) {
719 ioState &= ~POLL_OUT_SCHEDULED;
720 pollOutId = 0;
721 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
722 return;
723 }
724
725 if (connectPromise != null) {
726
727
728
729 assert eventLoop().inEventLoop();
730
731 boolean connectStillInProgress = false;
732 try {
733 boolean wasActive = isActive();
734 if (!socket.finishConnect()) {
735 connectStillInProgress = true;
736 return;
737 }
738 fulfillConnectPromise(connectPromise, wasActive);
739 } catch (Throwable t) {
740 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
741 } finally {
742 if (!connectStillInProgress) {
743
744
745
746 cancelConnectTimeoutFuture();
747 connectPromise = null;
748 } else {
749
750 assert !socket.isBlocking();
751
752
753 schedulePollOut();
754 }
755 }
756 } else if (!socket.isOutputShutdown()) {
757
758 super.flush0();
759 }
760 }
761
762
763
764
765
766
767
768
769
770 private void writeComplete(byte op, int res, int flags, short data) {
771 if ((ioState & CONNECT_SCHEDULED) != 0) {
772
773
774 freeMsgHdrArray();
775 if (res > 0) {
776
777 outboundBuffer().removeBytes(res);
778
779
780 connectComplete(op, 0, flags, data);
781 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
782
783
784
785
786 submitConnect((InetSocketAddress) requestedRemoteAddress);
787 } else {
788
789 connectComplete(op, res, flags, data);
790 }
791 return;
792 }
793 assert numOutstandingWrites > 0;
794 --numOutstandingWrites;
795
796 boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
797 if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
798
799 assert !socket.isBlocking();
800
801
802 schedulePollOut();
803 }
804
805
806
807 if (numOutstandingWrites == 0) {
808 ioState &= ~WRITE_SCHEDULED;
809
810
811 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
812 doWrite(unsafe().outboundBuffer());
813 }
814 }
815 }
816
817
818
819
820
821
822
823
824
825 abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
826
827
828
829
830
831
832
833
834
835 void cancelComplete0(byte op, int res, int flags, short data) {
836
837 }
838
839
840
841
842
843
844
845
846 void connectComplete(byte op, int res, int flags, short data) {
847 ioState &= ~CONNECT_SCHEDULED;
848 freeRemoteAddressMemory();
849
850 if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
851
852 assert !socket.isBlocking();
853
854
855 schedulePollOut();
856 } else {
857 try {
858 if (res == 0) {
859 fulfillConnectPromise(connectPromise, active);
860 if (readPending) {
861 doBeginReadNow();
862 }
863 } else {
864 try {
865 Errors.throwConnectException("io_uring connect", res);
866 } catch (Throwable cause) {
867 fulfillConnectPromise(connectPromise, cause);
868 }
869 }
870 } finally {
871
872
873
874 cancelConnectTimeoutFuture();
875 connectPromise = null;
876 }
877 }
878 }
879
880 @Override
881 public void connect(
882 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
883
884
885 if (promise.isDone() || !ensureOpen(promise)) {
886 return;
887 }
888
889 if (!isOpen()) {
890 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
891 return;
892 }
893 try {
894 if (connectPromise != null) {
895 throw new ConnectionPendingException();
896 }
897 if (localAddress instanceof InetSocketAddress) {
898 checkResolvable((InetSocketAddress) localAddress);
899 }
900
901 if (remoteAddress instanceof InetSocketAddress) {
902 checkResolvable((InetSocketAddress) remoteAddress);
903 }
904
905 if (remote != null) {
906
907
908
909 throw new AlreadyConnectedException();
910 }
911
912 if (localAddress != null) {
913 socket.bind(localAddress);
914 }
915
916 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
917
918 ByteBuf initialData = null;
919 if (IoUring.isTcpFastOpenClientSideAvailable() &&
920 config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
921 ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
922 outbound.addFlush();
923 Object curr;
924 if ((curr = outbound.current()) instanceof ByteBuf) {
925 initialData = (ByteBuf) curr;
926 }
927 }
928 if (initialData != null) {
929 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
930 MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
931 hdr.write(socket, inetSocketAddress, initialData.memoryAddress(),
932 initialData.readableBytes(), (short) 0);
933
934 int fd = fd().intValue();
935 IoUringIoRegistration registration = registration();
936 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
937 hdr.address(), hdr.idx());
938 connectId = registration.submit(ops);
939 if (connectId == 0) {
940
941 freeMsgHdrArray();
942 }
943 } else {
944 submitConnect(inetSocketAddress);
945 }
946 if (connectId != 0) {
947 ioState |= CONNECT_SCHEDULED;
948 }
949 } catch (Throwable t) {
950 closeIfClosed();
951 promise.tryFailure(annotateConnectException(t, remoteAddress));
952 return;
953 }
954 connectPromise = promise;
955 requestedRemoteAddress = remoteAddress;
956
957 int connectTimeoutMillis = config().getConnectTimeoutMillis();
958 if (connectTimeoutMillis > 0) {
959 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
960 @Override
961 public void run() {
962 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
963 if (connectPromise != null && !connectPromise.isDone() &&
964 connectPromise.tryFailure(new ConnectTimeoutException(
965 "connection timed out: " + remoteAddress))) {
966 close(voidPromise());
967 }
968 }
969 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
970 }
971
972 promise.addListener(new ChannelFutureListener() {
973 @Override
974 public void operationComplete(ChannelFuture future) {
975
976
977 if (future.isCancelled()) {
978 cancelConnectTimeoutFuture();
979 connectPromise = null;
980 close(voidPromise());
981 }
982 }
983 });
984 }
985 }
986
987 private void submitConnect(InetSocketAddress inetSocketAddress) {
988 remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
989 long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
990
991 SockaddrIn.write(socket.isIpv6(), remoteAddressMemoryAddress, inetSocketAddress);
992
993 int fd = fd().intValue();
994 IoUringIoRegistration registration = registration();
995 IoUringIoOps ops = IoUringIoOps.newConnect(
996 fd, (byte) 0, remoteAddressMemoryAddress, nextOpsId());
997 connectId = registration.submit(ops);
998 if (connectId == 0) {
999
1000 freeRemoteAddressMemory();
1001 }
1002 }
1003
1004 @Override
1005 protected Object filterOutboundMessage(Object msg) {
1006 if (msg instanceof ByteBuf) {
1007 ByteBuf buf = (ByteBuf) msg;
1008 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1009 }
1010 throw new UnsupportedOperationException("unsupported message type");
1011 }
1012
1013 @Override
1014 protected void doRegister(ChannelPromise promise) {
1015 IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1016 eventLoop.register(ioUringUnsafe()).addListener(f -> {
1017 if (f.isSuccess()) {
1018 registration = (IoUringIoRegistration) f.getNow();
1019 promise.setSuccess();
1020 } else {
1021 promise.setFailure(f.cause());
1022 }
1023 });
1024 }
1025
1026 @Override
1027 protected final void doDeregister() {
1028
1029 ioUringUnsafe().cancelOps(connectPromise != null);
1030 }
1031
1032 @Override
1033 protected void doBind(final SocketAddress local) throws Exception {
1034 if (local instanceof InetSocketAddress) {
1035 checkResolvable((InetSocketAddress) local);
1036 }
1037 socket.bind(local);
1038 this.local = socket.localAddress();
1039 }
1040
1041 protected static void checkResolvable(InetSocketAddress addr) {
1042 if (addr.isUnresolved()) {
1043 throw new UnresolvedAddressException();
1044 }
1045 }
1046
1047 @Override
1048 protected final SocketAddress localAddress0() {
1049 return local;
1050 }
1051
1052 @Override
1053 protected final SocketAddress remoteAddress0() {
1054 return remote;
1055 }
1056
1057 private static boolean isAllowHalfClosure(ChannelConfig config) {
1058 return config instanceof SocketChannelConfig &&
1059 ((SocketChannelConfig) config).isAllowHalfClosure();
1060 }
1061
1062 private void cancelConnectTimeoutFuture() {
1063 if (connectTimeoutFuture != null) {
1064 connectTimeoutFuture.cancel(false);
1065 connectTimeoutFuture = null;
1066 }
1067 }
1068
1069 private void computeRemote() {
1070 if (requestedRemoteAddress instanceof InetSocketAddress) {
1071 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1072 }
1073 }
1074
1075 private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1076 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1077 }
1078
1079 private void clearPollFlag(int pollMask) {
1080 if ((pollMask & Native.POLLIN) != 0) {
1081 ioState &= ~POLL_IN_SCHEDULED;
1082 pollInId = 0;
1083 }
1084 if ((pollMask & Native.POLLOUT) != 0) {
1085 ioState &= ~POLL_OUT_SCHEDULED;
1086 pollOutId = 0;
1087 }
1088 if ((pollMask & Native.POLLRDHUP) != 0) {
1089 ioState &= ~POLL_RDHUP_SCHEDULED;
1090 pollRdhupId = 0;
1091 }
1092 }
1093 }