1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufHolder;
21 import io.netty.buffer.ByteBufUtil;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractChannel;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelOption;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.ServerChannel;
38 import io.netty.channel.socket.ChannelInputShutdownEvent;
39 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40 import io.netty.channel.socket.SocketChannelConfig;
41 import io.netty.channel.unix.Buffer;
42 import io.netty.channel.unix.Errors;
43 import io.netty.channel.unix.FileDescriptor;
44 import io.netty.channel.unix.UnixChannel;
45 import io.netty.channel.unix.UnixChannelUtil;
46 import io.netty.util.ReferenceCountUtil;
47 import io.netty.util.concurrent.PromiseNotifier;
48 import io.netty.util.internal.logging.InternalLogger;
49 import io.netty.util.internal.logging.InternalLoggerFactory;
50
51 import java.io.IOException;
52 import java.net.InetSocketAddress;
53 import java.net.SocketAddress;
54 import java.nio.ByteBuffer;
55 import java.nio.channels.AlreadyConnectedException;
56 import java.nio.channels.ClosedChannelException;
57 import java.nio.channels.ConnectionPendingException;
58 import java.nio.channels.NotYetConnectedException;
59 import java.nio.channels.UnresolvedAddressException;
60 import java.util.concurrent.ScheduledFuture;
61 import java.util.concurrent.TimeUnit;
62
63 import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
64 import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
65 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
66 import static io.netty.util.internal.ObjectUtil.checkNotNull;
67
68
69 abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
70 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
71 final LinuxSocket socket;
72 protected volatile boolean active;
73
74
75 private static final int POLL_IN_SCHEDULED = 1;
76 private static final int POLL_OUT_SCHEDULED = 1 << 2;
77 private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
78 private static final int WRITE_SCHEDULED = 1 << 4;
79 private static final int READ_SCHEDULED = 1 << 5;
80 private static final int CONNECT_SCHEDULED = 1 << 6;
81
82 private short opsId = Short.MIN_VALUE;
83
84 private long pollInId;
85 private long pollOutId;
86 private long pollRdhupId;
87 private long connectId;
88
89
90 private byte ioState;
91
92
93
94
95 private short numOutstandingWrites;
96 private short numOutstandingReads;
97
98 private boolean readPending;
99 private boolean inReadComplete;
100
101 private final class DelayedClose {
102 private final ChannelPromise promise;
103 private final Throwable cause;
104 private final ClosedChannelException closeCause;
105
106 DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
107 this.promise = promise;
108 this.cause = cause;
109 this.closeCause = closeCause;
110 }
111 }
112 private DelayedClose delayedClose;
113 private boolean inputClosedSeenErrorOnRead;
114
115
116
117
118 private ChannelPromise connectPromise;
119 private ScheduledFuture<?> connectTimeoutFuture;
120 private SocketAddress requestedRemoteAddress;
121 private ByteBuffer remoteAddressMemory;
122 private MsgHdrMemoryArray msgHdrMemoryArray;
123
124 private IoUringIoRegistration registration;
125
126 private volatile SocketAddress local;
127 private volatile SocketAddress remote;
128
129 AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
130 super(parent);
131 this.socket = checkNotNull(socket, "fd");
132
133 if (active) {
134
135
136 this.active = true;
137 this.local = socket.localAddress();
138 this.remote = socket.remoteAddress();
139 }
140
141 logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
142 }
143
144 AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
145 super(parent);
146 this.socket = checkNotNull(fd, "fd");
147 this.active = true;
148
149
150
151 this.remote = remote;
152 this.local = fd.localAddress();
153 }
154
155
156
157
158
159
160 protected final short nextOpsId() {
161 short id = opsId++;
162
163
164 if (id == 0) {
165 id = opsId++;
166 }
167 return id;
168 }
169
170 public final boolean isOpen() {
171 return socket.isOpen();
172 }
173
174 @Override
175 public boolean isActive() {
176 return active;
177 }
178
179 @Override
180 public final FileDescriptor fd() {
181 return socket;
182 }
183
184 private AbstractUringUnsafe ioUringUnsafe() {
185 return (AbstractUringUnsafe) unsafe();
186 }
187
188 @Override
189 protected boolean isCompatible(final EventLoop loop) {
190 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
191 }
192
193 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
194 return newDirectBuffer(buf, buf);
195 }
196
197 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
198 final int readableBytes = buf.readableBytes();
199 if (readableBytes == 0) {
200 ReferenceCountUtil.release(holder);
201 return Unpooled.EMPTY_BUFFER;
202 }
203
204 final ByteBufAllocator alloc = alloc();
205 if (alloc.isDirectBufferPooled()) {
206 return newDirectBuffer0(holder, buf, alloc, readableBytes);
207 }
208
209 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
210 if (directBuf == null) {
211 return newDirectBuffer0(holder, buf, alloc, readableBytes);
212 }
213
214 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
215 ReferenceCountUtil.safeRelease(holder);
216 return directBuf;
217 }
218
219 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
220 final ByteBuf directBuf = alloc.directBuffer(capacity);
221 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
222 ReferenceCountUtil.safeRelease(holder);
223 return directBuf;
224 }
225
226
227
228
229
230
231
232 protected abstract void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads);
233
234
235
236
237
238
239
240 protected abstract void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites);
241
242 @Override
243 protected void doDisconnect() throws Exception {
244 }
245
246 protected byte flags(byte flags) {
247 if (((IOUringChannelConfig) config()).getIoseqAsync()) {
248 return (byte) (flags | Native.IOSQE_ASYNC);
249 }
250 return flags;
251 }
252
253 private void freeRemoteAddressMemory() {
254 if (remoteAddressMemory != null) {
255 Buffer.free(remoteAddressMemory);
256 remoteAddressMemory = null;
257 }
258 }
259
260 private void freeMsgHdrArray() {
261 if (msgHdrMemoryArray != null) {
262 msgHdrMemoryArray.release();
263 msgHdrMemoryArray = null;
264 }
265 }
266
267 @Override
268 protected void doClose() throws Exception {
269 active = false;
270
271 if (registration != null) {
272 if (socket.markClosed()) {
273 int fd = fd().intValue();
274 IoUringIoOps ops = IoUringIoOps.newClose(fd, flags((byte) 0), nextOpsId());
275 registration.submit(ops);
276 }
277 } else {
278
279 socket.close();
280 ioUringUnsafe().freeResourcesNowIfNeeded(null);
281 }
282 }
283
284 @Override
285 protected final void doBeginRead() {
286 if (inputClosedSeenErrorOnRead) {
287
288 return;
289 }
290 if (readPending) {
291
292 return;
293 }
294 if (inReadComplete || !isActive()) {
295
296
297
298 readPending = true;
299 return;
300 }
301 doBeginReadNow();
302 }
303
304 private void doBeginReadNow() {
305 if (!((IOUringChannelConfig) config()).getPollInFirst()) {
306
307 ioUringUnsafe().scheduleFirstReadIfNeeded();
308 } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
309 ioUringUnsafe().schedulePollIn();
310 }
311 }
312
313 private void submitAndRunNow() {
314
315
316 registration().submit(IoUringIoHandler.SUBMIT_AND_RUN_ALL);
317 }
318
319 @Override
320 protected void doWrite(ChannelOutboundBuffer in) {
321 scheduleWriteIfNeeded(in, true);
322 }
323
324 protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
325 if ((ioState & WRITE_SCHEDULED) != 0) {
326 return;
327 }
328 if (scheduleWrite(in) > 0) {
329 ioState |= WRITE_SCHEDULED;
330 if (submitAndRunNow && !isWritable()) {
331 submitAndRunNow();
332 }
333 }
334 }
335
336 private int scheduleWrite(ChannelOutboundBuffer in) {
337 if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
338 return 0;
339 }
340 if (in == null) {
341 return 0;
342 }
343
344 int msgCount = in.size();
345 if (msgCount == 0) {
346 return 0;
347 }
348 Object msg = in.current();
349
350 if (msgCount > 1 && in.current() instanceof ByteBuf) {
351 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
352 } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
353 (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
354
355 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
356 } else {
357 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
358 }
359
360 assert numOutstandingWrites > 0;
361 return numOutstandingWrites;
362 }
363
364 protected final IoUringIoRegistration registration() {
365 assert registration != null;
366 return registration;
367 }
368
369 private void schedulePollOut() {
370 pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT);
371 }
372
373 final void schedulePollRdHup() {
374 pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP);
375 }
376
377 private long schedulePollAdd(int ioMask, int mask) {
378 assert (ioState & ioMask) == 0;
379 int fd = fd().intValue();
380 IoUringIoRegistration registration = registration();
381 IoUringIoOps ops = IoUringIoOps.newPollAdd(
382 fd, flags((byte) 0), mask, (short) mask);
383 long id = registration.submit(ops);
384 if (id != 0) {
385 ioState |= ioMask;
386 }
387 return id;
388 }
389
390 final void resetCachedAddresses() {
391 local = socket.localAddress();
392 remote = socket.remoteAddress();
393 }
394
395 abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
396 private IoUringRecvByteAllocatorHandle allocHandle;
397 private boolean socketIsEmpty;
398
399
400
401
402
403 protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
404
405
406
407
408
409 protected abstract int scheduleWriteSingle(Object msg);
410
411 private boolean closed;
412 private boolean freed;
413
414 @Override
415 public final void handle(IoRegistration registration, IoEvent ioEvent) {
416 IoUringIoRegistration reg = (IoUringIoRegistration) registration;
417 IoUringIoEvent event = (IoUringIoEvent) ioEvent;
418 byte op = event.opcode();
419 int res = event.res();
420 int flags = event.flags();
421 short data = event.data();
422 switch (op) {
423 case Native.IORING_OP_RECV:
424 case Native.IORING_OP_ACCEPT:
425 case Native.IORING_OP_RECVMSG:
426 case Native.IORING_OP_READ:
427 readComplete(op, res, flags, data);
428
429
430
431 handleDelayedClosed();
432 break;
433 case Native.IORING_OP_WRITEV:
434 case Native.IORING_OP_SEND:
435 case Native.IORING_OP_SENDMSG:
436 case Native.IORING_OP_WRITE:
437 case Native.IORING_OP_SPLICE:
438 writeComplete(op, res, flags, data);
439
440
441
442 handleDelayedClosed();
443 break;
444 case Native.IORING_OP_POLL_ADD:
445 pollAddComplete(res, flags, data);
446 break;
447 case Native.IORING_OP_POLL_REMOVE:
448
449
450 if (res == 0) {
451 clearPollFlag(data);
452 }
453 break;
454 case Native.IORING_OP_ASYNC_CANCEL:
455 cancelComplete0(op, res, flags, data);
456
457
458
459 handleDelayedClosed();
460 break;
461 case Native.IORING_OP_CONNECT:
462 connectComplete(op, res, flags, data);
463
464
465 freeMsgHdrArray();
466 freeRemoteAddressMemory();
467 break;
468 case Native.IORING_OP_CLOSE:
469 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
470 if (delayedClose != null) {
471 delayedClose.promise.setSuccess();
472 }
473 closed = true;
474 }
475 break;
476 }
477
478 if (ioState == 0 && closed) {
479 freeResourcesNowIfNeeded(reg);
480 }
481 }
482
483 private void freeResourcesNowIfNeeded(IoUringIoRegistration reg) {
484 if (!freed) {
485 freed = true;
486 freeResourcesNow(reg);
487 }
488 }
489
490
491
492
493
494
495 protected void freeResourcesNow(IoUringIoRegistration reg) {
496 freeMsgHdrArray();
497 freeRemoteAddressMemory();
498 if (reg != null) {
499 reg.cancel();
500 }
501 }
502
503 private void handleDelayedClosed() {
504 if (delayedClose != null && canCloseNow()) {
505 closeNow();
506 }
507 }
508
509 private void pollAddComplete(int res, int flags, short data) {
510 if ((data & Native.POLLOUT) != 0) {
511 pollOut(res);
512 }
513 if ((data & Native.POLLIN) != 0) {
514 pollIn(res);
515 }
516 if ((data & Native.POLLRDHUP) != 0) {
517 pollRdHup(res);
518 }
519 }
520
521 @Override
522 public final void close() throws Exception {
523 close(voidPromise());
524 }
525
526 @Override
527 protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
528 if (closeFuture().isDone()) {
529
530 safeSetSuccess(promise);
531 return;
532 }
533 if (delayedClose == null) {
534
535
536
537 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
538 } else {
539 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
540 return;
541 }
542
543 boolean cancelConnect = false;
544 try {
545 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
546 if (connectPromise != null) {
547
548 connectPromise.tryFailure(new ClosedChannelException());
549 AbstractIoUringChannel.this.connectPromise = null;
550 cancelConnect = true;
551 }
552
553 cancelConnectTimeoutFuture();
554 } finally {
555
556
557 cancelOps(cancelConnect);
558 }
559
560 if (canCloseNow()) {
561
562 closeNow();
563 }
564 }
565
566 private void cancelOps(boolean cancelConnect) {
567 if (registration == null || !registration.isValid()) {
568 return;
569 }
570 int fd = fd().intValue();
571 byte flags = flags((byte) 0);
572 if ((ioState & POLL_RDHUP_SCHEDULED) != 0) {
573 registration.submit(IoUringIoOps.newPollRemove(
574 fd, flags, pollRdhupId, (short) Native.POLLRDHUP));
575 }
576 if ((ioState & POLL_IN_SCHEDULED) != 0) {
577 registration.submit(IoUringIoOps.newPollRemove(
578 fd, flags, pollInId, (short) Native.POLLIN));
579 }
580 if ((ioState & POLL_OUT_SCHEDULED) != 0) {
581 registration.submit(IoUringIoOps.newPollRemove(
582 fd, flags, pollOutId, (short) Native.POLLOUT));
583 }
584 if (cancelConnect && connectId != 0) {
585
586 registration.submit(IoUringIoOps.newAsyncCancel(
587 fd, flags, connectId, Native.IORING_OP_CONNECT));
588 }
589 cancelOutstandingReads(registration, numOutstandingReads);
590 cancelOutstandingWrites(registration, numOutstandingWrites);
591 }
592
593 private boolean canCloseNow() {
594
595
596 return (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
597 }
598
599 private void closeNow() {
600 super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
601 }
602
603 @Override
604 protected final void flush0() {
605
606
607
608 if ((ioState & POLL_OUT_SCHEDULED) == 0) {
609 super.flush0();
610 }
611 }
612
613 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
614 if (promise == null) {
615
616 return;
617 }
618
619
620 promise.tryFailure(cause);
621 closeIfClosed();
622 }
623
624 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
625 if (promise == null) {
626
627 return;
628 }
629 active = true;
630
631 if (local == null) {
632 local = socket.localAddress();
633 }
634 computeRemote();
635
636
637 schedulePollRdHup();
638
639
640
641 boolean active = isActive();
642
643
644 boolean promiseSet = promise.trySuccess();
645
646
647
648 if (!wasActive && active) {
649 pipeline().fireChannelActive();
650 }
651
652
653 if (!promiseSet) {
654 close(voidPromise());
655 }
656 }
657
658 @Override
659 public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
660 if (allocHandle == null) {
661 allocHandle = new IoUringRecvByteAllocatorHandle(
662 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
663 }
664 return allocHandle;
665 }
666
667 final void shutdownInput(boolean allDataRead) {
668 logger.trace("shutdownInput Fd: {}", fd().intValue());
669 if (!socket.isInputShutdown()) {
670 if (isAllowHalfClosure(config())) {
671 try {
672 socket.shutdown(true, false);
673 } catch (IOException ignored) {
674
675
676 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
677 return;
678 } catch (NotYetConnectedException ignore) {
679
680
681 }
682 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
683 } else {
684 close(voidPromise());
685 return;
686 }
687 }
688 if (allDataRead && !inputClosedSeenErrorOnRead) {
689 inputClosedSeenErrorOnRead = true;
690 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
691 }
692 }
693
694 private void fireEventAndClose(Object evt) {
695 pipeline().fireUserEventTriggered(evt);
696 close(voidPromise());
697 }
698
699 final void schedulePollIn() {
700 assert (ioState & POLL_IN_SCHEDULED) == 0;
701 if (!isActive() || shouldBreakIoUringInReady(config())) {
702 return;
703 }
704 pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN);
705 }
706
707 private void readComplete(byte op, int res, int flags, short data) {
708 assert numOutstandingReads > 0;
709 if (--numOutstandingReads == 0) {
710 readPending = false;
711 ioState &= ~READ_SCHEDULED;
712 }
713 inReadComplete = true;
714 try {
715 socketIsEmpty = socketIsEmpty(flags);
716
717 readComplete0(op, res, flags, data, numOutstandingReads);
718 } finally {
719 socketIsEmpty = false;
720 inReadComplete = false;
721
722
723 if (readPending && recvBufAllocHandle().isReadComplete()) {
724 doBeginReadNow();
725 }
726 }
727 }
728
729
730
731
732 protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
733
734
735
736
737 private void pollRdHup(int res) {
738 ioState &= ~POLL_RDHUP_SCHEDULED;
739 pollRdhupId = 0;
740 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
741 return;
742 }
743
744
745 recvBufAllocHandle().rdHupReceived();
746
747 if (isActive()) {
748 scheduleFirstReadIfNeeded();
749 } else {
750
751 shutdownInput(false);
752 }
753 }
754
755
756
757
758 private void pollIn(int res) {
759 ioState &= ~POLL_IN_SCHEDULED;
760 pollInId = 0;
761 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
762 return;
763 }
764
765 scheduleFirstReadIfNeeded();
766 }
767
768 private void scheduleFirstReadIfNeeded() {
769 if ((ioState & READ_SCHEDULED) == 0) {
770 scheduleFirstRead();
771 }
772 }
773
774 private void scheduleFirstRead() {
775
776 final ChannelConfig config = config();
777 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
778 allocHandle.reset(config);
779 scheduleRead(true);
780 }
781
782 protected final void scheduleRead(boolean first) {
783
784 if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
785 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
786 if (numOutstandingReads > 0) {
787 ioState |= READ_SCHEDULED;
788 }
789 }
790 }
791
792
793
794
795
796
797
798
799 protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
800
801
802
803
804
805
806 private void pollOut(int res) {
807 ioState &= ~POLL_OUT_SCHEDULED;
808 pollOutId = 0;
809 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
810 return;
811 }
812
813 if (connectPromise != null) {
814
815
816
817 assert eventLoop().inEventLoop();
818
819 boolean connectStillInProgress = false;
820 try {
821 boolean wasActive = isActive();
822 if (!socket.finishConnect()) {
823 connectStillInProgress = true;
824 return;
825 }
826 fulfillConnectPromise(connectPromise, wasActive);
827 } catch (Throwable t) {
828 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
829 } finally {
830 if (!connectStillInProgress) {
831
832
833
834 cancelConnectTimeoutFuture();
835 connectPromise = null;
836 } else {
837
838 schedulePollOut();
839 }
840 }
841 } else if (!socket.isOutputShutdown()) {
842
843 super.flush0();
844 }
845 }
846
847
848
849
850
851
852
853
854
855 private void writeComplete(byte op, int res, int flags, short data) {
856 if ((ioState & CONNECT_SCHEDULED) != 0) {
857
858
859 freeMsgHdrArray();
860 if (res > 0) {
861
862 outboundBuffer().removeBytes(res);
863
864
865 connectComplete(op, 0, flags, data);
866 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
867
868
869
870
871 submitConnect((InetSocketAddress) requestedRemoteAddress);
872 } else {
873
874 connectComplete(op, res, flags, data);
875 }
876 return;
877 }
878 assert numOutstandingWrites > 0;
879 --numOutstandingWrites;
880
881 boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
882 if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
883
884
885 schedulePollOut();
886 }
887
888
889
890 if (numOutstandingWrites == 0) {
891 ioState &= ~WRITE_SCHEDULED;
892
893
894 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
895 scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
896 }
897 }
898 }
899
900
901
902
903
904
905
906
907
908 abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
909
910
911
912
913
914
915
916
917
918 void cancelComplete0(byte op, int res, int flags, short data) {
919
920 }
921
922
923
924
925
926
927
928
929 void connectComplete(byte op, int res, int flags, short data) {
930 ioState &= ~CONNECT_SCHEDULED;
931 freeRemoteAddressMemory();
932
933 if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
934
935 schedulePollOut();
936 } else {
937 try {
938 if (res == 0) {
939 fulfillConnectPromise(connectPromise, active);
940 if (readPending) {
941 doBeginReadNow();
942 }
943 } else {
944 try {
945 Errors.throwConnectException("io_uring connect", res);
946 } catch (Throwable cause) {
947 fulfillConnectPromise(connectPromise, cause);
948 }
949 }
950 } finally {
951
952
953
954 cancelConnectTimeoutFuture();
955 connectPromise = null;
956 }
957 }
958 }
959
960 @Override
961 public void connect(
962 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
963
964
965 if (promise.isDone() || !ensureOpen(promise)) {
966 return;
967 }
968
969 if (delayedClose != null) {
970 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
971 return;
972 }
973 try {
974 if (connectPromise != null) {
975 throw new ConnectionPendingException();
976 }
977 if (localAddress instanceof InetSocketAddress) {
978 checkResolvable((InetSocketAddress) localAddress);
979 }
980
981 if (remoteAddress instanceof InetSocketAddress) {
982 checkResolvable((InetSocketAddress) remoteAddress);
983 }
984
985 if (remote != null) {
986
987
988
989 throw new AlreadyConnectedException();
990 }
991
992 if (localAddress != null) {
993 socket.bind(localAddress);
994 }
995
996 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
997
998 ByteBuf initialData = null;
999 if (IoUring.isTcpFastOpenClientSideAvailable() &&
1000 config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1001 ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1002 outbound.addFlush();
1003 Object curr;
1004 if ((curr = outbound.current()) instanceof ByteBuf) {
1005 initialData = (ByteBuf) curr;
1006 }
1007 }
1008 if (initialData != null) {
1009 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1010 MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1011 hdr.write(socket, inetSocketAddress, initialData.memoryAddress(),
1012 initialData.readableBytes(), (short) 0);
1013
1014 int fd = fd().intValue();
1015 IoUringIoRegistration registration = registration();
1016 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, flags((byte) 0), Native.MSG_FASTOPEN,
1017 hdr.address(), hdr.idx());
1018 connectId = registration.submit(ops);
1019 if (connectId == 0) {
1020
1021 freeMsgHdrArray();
1022 }
1023 } else {
1024 submitConnect(inetSocketAddress);
1025 }
1026 if (connectId != 0) {
1027 ioState |= CONNECT_SCHEDULED;
1028 }
1029 } catch (Throwable t) {
1030 closeIfClosed();
1031 promise.tryFailure(annotateConnectException(t, remoteAddress));
1032 return;
1033 }
1034 connectPromise = promise;
1035 requestedRemoteAddress = remoteAddress;
1036
1037 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1038 if (connectTimeoutMillis > 0) {
1039 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1040 @Override
1041 public void run() {
1042 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1043 if (connectPromise != null && !connectPromise.isDone() &&
1044 connectPromise.tryFailure(new ConnectTimeoutException(
1045 "connection timed out: " + remoteAddress))) {
1046 close(voidPromise());
1047 }
1048 }
1049 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1050 }
1051
1052 promise.addListener(new ChannelFutureListener() {
1053 @Override
1054 public void operationComplete(ChannelFuture future) {
1055
1056
1057 if (future.isCancelled()) {
1058 cancelConnectTimeoutFuture();
1059 connectPromise = null;
1060 close(voidPromise());
1061 }
1062 }
1063 });
1064 }
1065 }
1066
1067 private void submitConnect(InetSocketAddress inetSocketAddress) {
1068 remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1069 long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
1070
1071 SockaddrIn.write(socket.isIpv6(), remoteAddressMemoryAddress, inetSocketAddress);
1072
1073 int fd = fd().intValue();
1074 IoUringIoRegistration registration = registration();
1075 IoUringIoOps ops = IoUringIoOps.newConnect(
1076 fd, flags((byte) 0), remoteAddressMemoryAddress, nextOpsId());
1077 connectId = registration.submit(ops);
1078 if (connectId == 0) {
1079
1080 freeRemoteAddressMemory();
1081 }
1082 }
1083
1084 @Override
1085 protected Object filterOutboundMessage(Object msg) {
1086 if (msg instanceof ByteBuf) {
1087 ByteBuf buf = (ByteBuf) msg;
1088 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1089 }
1090 throw new UnsupportedOperationException("unsupported message type");
1091 }
1092
1093 @Override
1094 protected void doRegister(ChannelPromise promise) {
1095 IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1096 eventLoop.register(ioUringUnsafe()).addListener(f -> {
1097 if (f.isSuccess()) {
1098 registration = (IoUringIoRegistration) f.getNow();
1099 promise.setSuccess();
1100 } else {
1101 promise.setFailure(f.cause());
1102 }
1103 });
1104 }
1105
1106 @Override
1107 protected final void doDeregister() {
1108
1109 ioUringUnsafe().cancelOps(connectPromise != null);
1110 }
1111
1112 @Override
1113 protected void doBind(final SocketAddress local) throws Exception {
1114 if (local instanceof InetSocketAddress) {
1115 checkResolvable((InetSocketAddress) local);
1116 }
1117 socket.bind(local);
1118 this.local = socket.localAddress();
1119 }
1120
1121 protected static void checkResolvable(InetSocketAddress addr) {
1122 if (addr.isUnresolved()) {
1123 throw new UnresolvedAddressException();
1124 }
1125 }
1126
1127 @Override
1128 protected final SocketAddress localAddress0() {
1129 return local;
1130 }
1131
1132 @Override
1133 protected final SocketAddress remoteAddress0() {
1134 return remote;
1135 }
1136
1137 private static boolean isAllowHalfClosure(ChannelConfig config) {
1138 return config instanceof SocketChannelConfig &&
1139 ((SocketChannelConfig) config).isAllowHalfClosure();
1140 }
1141
1142 private void cancelConnectTimeoutFuture() {
1143 if (connectTimeoutFuture != null) {
1144 connectTimeoutFuture.cancel(false);
1145 connectTimeoutFuture = null;
1146 }
1147 }
1148
1149 private void computeRemote() {
1150 if (requestedRemoteAddress instanceof InetSocketAddress) {
1151 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1152 }
1153 }
1154
1155 private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1156 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1157 }
1158
1159 private void clearPollFlag(int pollMask) {
1160 if ((pollMask & Native.POLLIN) != 0) {
1161 ioState &= ~POLL_IN_SCHEDULED;
1162 pollInId = 0;
1163 }
1164 if ((pollMask & Native.POLLOUT) != 0) {
1165 ioState &= ~POLL_OUT_SCHEDULED;
1166 pollOutId = 0;
1167 }
1168 if ((pollMask & Native.POLLRDHUP) != 0) {
1169 ioState &= ~POLL_RDHUP_SCHEDULED;
1170 pollRdhupId = 0;
1171 }
1172 }
1173
1174
1175
1176
1177
1178
1179
1180 protected abstract boolean socketIsEmpty(int flags);
1181 }