1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufHolder;
21 import io.netty.buffer.ByteBufUtil;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractChannel;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelOption;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.ServerChannel;
38 import io.netty.channel.socket.ChannelInputShutdownEvent;
39 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40 import io.netty.channel.socket.SocketChannelConfig;
41 import io.netty.channel.unix.Buffer;
42 import io.netty.channel.unix.DomainSocketAddress;
43 import io.netty.channel.unix.Errors;
44 import io.netty.channel.unix.FileDescriptor;
45 import io.netty.channel.unix.UnixChannel;
46 import io.netty.channel.unix.UnixChannelUtil;
47 import io.netty.util.ReferenceCountUtil;
48 import io.netty.util.concurrent.PromiseNotifier;
49 import io.netty.util.internal.CleanableDirectBuffer;
50 import io.netty.util.internal.StringUtil;
51 import io.netty.util.internal.logging.InternalLogger;
52 import io.netty.util.internal.logging.InternalLoggerFactory;
53
54 import java.io.IOException;
55 import java.net.InetSocketAddress;
56 import java.net.SocketAddress;
57 import java.nio.ByteBuffer;
58 import java.nio.channels.AlreadyConnectedException;
59 import java.nio.channels.ClosedChannelException;
60 import java.nio.channels.ConnectionPendingException;
61 import java.nio.channels.NotYetConnectedException;
62 import java.nio.channels.UnresolvedAddressException;
63 import java.util.concurrent.ScheduledFuture;
64 import java.util.concurrent.TimeUnit;
65
66 import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
67 import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
68 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
69 import static io.netty.util.internal.ObjectUtil.checkNotNull;
70 import static io.netty.util.internal.StringUtil.className;
71
72
73 abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
74 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
75 final LinuxSocket socket;
76 protected volatile boolean active;
77
78
79 private static final int POLL_IN_SCHEDULED = 1;
80 private static final int POLL_OUT_SCHEDULED = 1 << 2;
81 private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
82 private static final int WRITE_SCHEDULED = 1 << 4;
83 private static final int READ_SCHEDULED = 1 << 5;
84 private static final int CONNECT_SCHEDULED = 1 << 6;
85
86 private short opsId = Short.MIN_VALUE;
87
88 private long pollInId;
89 private long pollOutId;
90 private long pollRdhupId;
91 private long connectId;
92
93
94 private byte ioState;
95
96
97
98
99 private short numOutstandingWrites;
100
101 private short numOutstandingReads;
102
103 private boolean readPending;
104 private boolean inReadComplete;
105 private boolean socketHasMoreData;
106
107 private static final class DelayedClose {
108 private final ChannelPromise promise;
109 private final Throwable cause;
110 private final ClosedChannelException closeCause;
111
112 DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
113 this.promise = promise;
114 this.cause = cause;
115 this.closeCause = closeCause;
116 }
117 }
118 private DelayedClose delayedClose;
119 private boolean inputClosedSeenErrorOnRead;
120
121
122
123
124 private ChannelPromise connectPromise;
125 private ScheduledFuture<?> connectTimeoutFuture;
126 private SocketAddress requestedRemoteAddress;
127 private CleanableDirectBuffer cleanable;
128 private ByteBuffer remoteAddressMemory;
129 private MsgHdrMemoryArray msgHdrMemoryArray;
130
131 private IoRegistration registration;
132
133 private volatile SocketAddress local;
134 private volatile SocketAddress remote;
135
136 AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
137 super(parent);
138 this.socket = checkNotNull(socket, "fd");
139
140 if (active) {
141
142
143 this.active = true;
144 this.local = socket.localAddress();
145 this.remote = socket.remoteAddress();
146 }
147
148 logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
149 }
150
151 AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
152 super(parent);
153 this.socket = checkNotNull(fd, "fd");
154 this.active = true;
155
156
157
158 this.remote = remote;
159 this.local = fd.localAddress();
160 }
161
162
163 final void autoReadCleared() {
164 if (!isRegistered()) {
165 return;
166 }
167 IoRegistration registration = this.registration;
168 if (registration == null || !registration.isValid()) {
169 return;
170 }
171 if (eventLoop().inEventLoop()) {
172 clearRead();
173 } else {
174 eventLoop().execute(this::clearRead);
175 }
176 }
177
178 private void clearRead() {
179 assert eventLoop().inEventLoop();
180 readPending = false;
181 IoRegistration registration = this.registration;
182 if (registration == null || !registration.isValid()) {
183 return;
184 }
185
186 cancelOutstandingReads(registration(), numOutstandingReads);
187 }
188
189
190
191
192
193
194 protected final short nextOpsId() {
195 short id = opsId++;
196
197
198 if (id == 0) {
199 id = opsId++;
200 }
201 return id;
202 }
203
204 public final boolean isOpen() {
205 return socket.isOpen();
206 }
207
208 @Override
209 public boolean isActive() {
210 return active;
211 }
212
213 @Override
214 public final FileDescriptor fd() {
215 return socket;
216 }
217
218 private AbstractUringUnsafe ioUringUnsafe() {
219 return (AbstractUringUnsafe) unsafe();
220 }
221
222 @Override
223 protected boolean isCompatible(final EventLoop loop) {
224 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
225 }
226
227 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
228 return newDirectBuffer(buf, buf);
229 }
230
231 protected boolean allowMultiShotPollIn() {
232 return IoUring.isPollAddMultishotEnabled();
233 }
234
235 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
236 final int readableBytes = buf.readableBytes();
237 if (readableBytes == 0) {
238 ReferenceCountUtil.release(holder);
239 return Unpooled.EMPTY_BUFFER;
240 }
241
242 final ByteBufAllocator alloc = alloc();
243 if (alloc.isDirectBufferPooled()) {
244 return newDirectBuffer0(holder, buf, alloc, readableBytes);
245 }
246
247 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
248 if (directBuf == null) {
249 return newDirectBuffer0(holder, buf, alloc, readableBytes);
250 }
251
252 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
253 ReferenceCountUtil.safeRelease(holder);
254 return directBuf;
255 }
256
257 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
258 final ByteBuf directBuf = alloc.directBuffer(capacity);
259 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
260 ReferenceCountUtil.safeRelease(holder);
261 return directBuf;
262 }
263
264
265
266
267
268
269
270 protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
271
272
273
274
275
276
277
278 protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
279
280 @Override
281 protected void doDisconnect() throws Exception {
282 }
283
284 private void freeRemoteAddressMemory() {
285 if (remoteAddressMemory != null) {
286 cleanable.clean();
287 cleanable = null;
288 remoteAddressMemory = null;
289 }
290 }
291
292 private void freeMsgHdrArray() {
293 if (msgHdrMemoryArray != null) {
294 msgHdrMemoryArray.release();
295 msgHdrMemoryArray = null;
296 }
297 }
298
299 @Override
300 protected void doClose() throws Exception {
301 active = false;
302
303 if (registration != null) {
304 if (socket.markClosed()) {
305 int fd = fd().intValue();
306 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
307 registration.submit(ops);
308 }
309 } else {
310
311 socket.close();
312 ioUringUnsafe().unregistered();
313 }
314 }
315
316 @Override
317 protected final void doBeginRead() {
318 if (inputClosedSeenErrorOnRead) {
319
320 return;
321 }
322 if (readPending) {
323
324 return;
325 }
326 readPending = true;
327 if (inReadComplete || !isActive()) {
328
329
330
331 return;
332 }
333 doBeginReadNow();
334 }
335
336 private void doBeginReadNow() {
337 if (inputClosedSeenErrorOnRead) {
338
339 return;
340 }
341 if (!isPollInFirst() ||
342
343
344 socketHasMoreData) {
345
346 ioUringUnsafe().scheduleFirstReadIfNeeded();
347 } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
348 ioUringUnsafe().schedulePollIn();
349 }
350 }
351
352 @Override
353 protected void doWrite(ChannelOutboundBuffer in) {
354 scheduleWriteIfNeeded(in, true);
355 }
356
357 protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
358 if ((ioState & WRITE_SCHEDULED) != 0) {
359 return;
360 }
361 if (scheduleWrite(in) > 0) {
362 ioState |= WRITE_SCHEDULED;
363 if (submitAndRunNow && !isWritable()) {
364 submitAndRunNow();
365 }
366 }
367 }
368
369 protected void submitAndRunNow() {
370
371 }
372
373 private int scheduleWrite(ChannelOutboundBuffer in) {
374 if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
375 return 0;
376 }
377 if (in == null) {
378 return 0;
379 }
380
381 int msgCount = in.size();
382 if (msgCount == 0) {
383 return 0;
384 }
385 Object msg = in.current();
386
387 if (msgCount > 1 && in.current() instanceof ByteBuf) {
388 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
389 } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
390 (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
391
392 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
393 } else {
394 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
395 }
396
397 assert numOutstandingWrites > 0;
398 return numOutstandingWrites;
399 }
400
401 protected final IoRegistration registration() {
402 assert registration != null;
403 return registration;
404 }
405
406 private void schedulePollOut() {
407 pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
408 }
409
410 final void schedulePollRdHup() {
411 pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
412 }
413
414 private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
415 assert (ioState & ioMask) == 0;
416 int fd = fd().intValue();
417 IoRegistration registration = registration();
418 IoUringIoOps ops = IoUringIoOps.newPollAdd(
419 fd, (byte) 0, mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
420 long id = registration.submit(ops);
421 if (id != 0) {
422 ioState |= (byte) ioMask;
423 }
424 return id;
425 }
426
427 final void resetCachedAddresses() {
428 local = socket.localAddress();
429 remote = socket.remoteAddress();
430 }
431
432 protected abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
433 private IoUringRecvByteAllocatorHandle allocHandle;
434 private boolean closed;
435 private boolean socketIsEmpty;
436
437
438
439
440
441 protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
442
443
444
445
446
447 protected abstract int scheduleWriteSingle(Object msg);
448
449 @Override
450 public final void handle(IoRegistration registration, IoEvent ioEvent) {
451 IoUringIoEvent event = (IoUringIoEvent) ioEvent;
452 byte op = event.opcode();
453 int res = event.res();
454 int flags = event.flags();
455 short data = event.data();
456 switch (op) {
457 case Native.IORING_OP_RECV:
458 case Native.IORING_OP_ACCEPT:
459 case Native.IORING_OP_RECVMSG:
460 case Native.IORING_OP_READ:
461 readComplete(op, res, flags, data);
462 break;
463 case Native.IORING_OP_WRITEV:
464 case Native.IORING_OP_SEND:
465 case Native.IORING_OP_SENDMSG:
466 case Native.IORING_OP_WRITE:
467 case Native.IORING_OP_SPLICE:
468 case Native.IORING_OP_SEND_ZC:
469 case Native.IORING_OP_SENDMSG_ZC:
470 writeComplete(op, res, flags, data);
471 break;
472 case Native.IORING_OP_POLL_ADD:
473 pollAddComplete(res, flags, data);
474 break;
475 case Native.IORING_OP_ASYNC_CANCEL:
476 cancelComplete0(op, res, flags, data);
477 break;
478 case Native.IORING_OP_CONNECT:
479 connectComplete(op, res, flags, data);
480
481
482 freeMsgHdrArray();
483 freeRemoteAddressMemory();
484 break;
485 case Native.IORING_OP_CLOSE:
486 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
487 if (delayedClose != null) {
488 delayedClose.promise.setSuccess();
489 }
490 closed = true;
491 }
492 break;
493 default:
494 break;
495 }
496
497
498
499 handleDelayedClosed();
500
501 if (ioState == 0 && closed) {
502
503 registration.cancel();
504 }
505 }
506
507 @Override
508 public void unregistered() {
509 freeMsgHdrArray();
510 freeRemoteAddressMemory();
511 }
512
513 private void handleDelayedClosed() {
514 if (delayedClose != null && canCloseNow()) {
515 closeNow();
516 }
517 }
518
519 private void pollAddComplete(int res, int flags, short data) {
520 if ((res & Native.POLLOUT) != 0) {
521 pollOut(res);
522 }
523 if ((res & Native.POLLIN) != 0) {
524 pollIn(res, flags, data);
525 }
526 if ((res & Native.POLLRDHUP) != 0) {
527 pollRdHup(res);
528 }
529 }
530
531 @Override
532 public final void close() throws Exception {
533 close(voidPromise());
534 }
535
536 @Override
537 protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
538 if (closeFuture().isDone()) {
539
540 safeSetSuccess(promise);
541 return;
542 }
543 if (delayedClose == null) {
544
545
546
547 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
548 } else {
549 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
550 return;
551 }
552
553 boolean cancelConnect = false;
554 try {
555 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
556 if (connectPromise != null) {
557
558 connectPromise.tryFailure(new ClosedChannelException());
559 AbstractIoUringChannel.this.connectPromise = null;
560 cancelConnect = true;
561 }
562
563 cancelConnectTimeoutFuture();
564 } finally {
565
566
567 cancelOps(cancelConnect);
568 }
569
570 if (canCloseNow()) {
571
572 closeNow();
573 }
574 }
575
576 private void cancelOps(boolean cancelConnect) {
577 if (registration == null || !registration.isValid()) {
578 return;
579 }
580 byte flags = (byte) 0;
581 if ((ioState & POLL_RDHUP_SCHEDULED) != 0 && pollRdhupId != 0) {
582 long id = registration.submit(
583 IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
584 assert id != 0;
585 pollRdhupId = 0;
586 }
587 if ((ioState & POLL_IN_SCHEDULED) != 0 && pollInId != 0) {
588 long id = registration.submit(
589 IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
590 assert id != 0;
591 pollInId = 0;
592 }
593 if ((ioState & POLL_OUT_SCHEDULED) != 0 && pollOutId != 0) {
594 long id = registration.submit(
595 IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
596 assert id != 0;
597 pollOutId = 0;
598 }
599 if (cancelConnect && connectId != 0) {
600
601 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
602 assert id != 0;
603 connectId = 0;
604 }
605 cancelOutstandingReads(registration, numOutstandingReads);
606 cancelOutstandingWrites(registration, numOutstandingWrites);
607 }
608
609 private boolean canCloseNow() {
610
611
612 return canCloseNow0() && (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
613 }
614
615 protected boolean canCloseNow0() {
616 return true;
617 }
618
619 private void closeNow() {
620 super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
621 }
622
623 @Override
624 protected final void flush0() {
625
626
627
628 if ((ioState & POLL_OUT_SCHEDULED) == 0) {
629 super.flush0();
630 }
631 }
632
633 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
634 if (promise == null) {
635
636 return;
637 }
638
639
640 promise.tryFailure(cause);
641 closeIfClosed();
642 }
643
644 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
645 if (promise == null) {
646
647 return;
648 }
649 active = true;
650
651 if (local == null) {
652 local = socket.localAddress();
653 }
654 computeRemote();
655
656
657 schedulePollRdHup();
658
659
660
661 boolean active = isActive();
662
663
664 boolean promiseSet = promise.trySuccess();
665
666
667
668 if (!wasActive && active) {
669 pipeline().fireChannelActive();
670 }
671
672
673 if (!promiseSet) {
674 close(voidPromise());
675 }
676 }
677
678 @Override
679 public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
680 if (allocHandle == null) {
681 allocHandle = new IoUringRecvByteAllocatorHandle(
682 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
683 }
684 return allocHandle;
685 }
686
687 final void shutdownInput(boolean allDataRead) {
688 logger.trace("shutdownInput Fd: {}", fd().intValue());
689 if (!socket.isInputShutdown()) {
690 if (isAllowHalfClosure(config())) {
691 try {
692 socket.shutdown(true, false);
693 } catch (IOException ignored) {
694
695
696 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
697 return;
698 } catch (NotYetConnectedException ignore) {
699
700
701 }
702 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
703 } else {
704
705 inputClosedSeenErrorOnRead = true;
706 close(voidPromise());
707 return;
708 }
709 }
710 if (allDataRead && !inputClosedSeenErrorOnRead) {
711 inputClosedSeenErrorOnRead = true;
712 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
713 }
714 }
715
716 private void fireEventAndClose(Object evt) {
717 pipeline().fireUserEventTriggered(evt);
718 close(voidPromise());
719 }
720
721 final void schedulePollIn() {
722 assert (ioState & POLL_IN_SCHEDULED) == 0;
723 if (!isActive() || shouldBreakIoUringInReady(config())) {
724 return;
725 }
726 pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, allowMultiShotPollIn());
727 }
728
729 private void readComplete(byte op, int res, int flags, short data) {
730 assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
731
732 boolean multishot = numOutstandingReads == -1;
733 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
734 if (rearm) {
735
736
737 ioState &= ~READ_SCHEDULED;
738 }
739 boolean pending = readPending;
740 if (multishot) {
741
742
743 readPending = false;
744 } else if (--numOutstandingReads == 0) {
745
746 readPending = false;
747 ioState &= ~READ_SCHEDULED;
748 }
749 inReadComplete = true;
750 try {
751 socketIsEmpty = socketIsEmpty(flags);
752 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
753 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
754 readComplete0(op, res, flags, data, numOutstandingReads);
755 } finally {
756 try {
757
758 if (recvBufAllocHandle().isReadComplete()) {
759
760 recvBufAllocHandle().reset(config());
761
762
763 if (!multishot) {
764 if (readPending) {
765
766
767 doBeginReadNow();
768 }
769 } else {
770
771
772
773 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
774
775
776
777
778 if (pending) {
779 doBeginReadNow();
780 }
781 } else if (rearm) {
782
783 doBeginReadNow();
784 } else if (!readPending) {
785
786
787 cancelOutstandingReads(registration, numOutstandingReads);
788 }
789 }
790 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
791
792
793
794
795 if (pending) {
796 doBeginReadNow();
797 }
798 } else if (multishot && rearm) {
799
800 doBeginReadNow();
801 }
802 } finally {
803 inReadComplete = false;
804 socketIsEmpty = false;
805 }
806 }
807 }
808
809
810
811
812 protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
813
814
815
816
817 private void pollRdHup(int res) {
818 ioState &= ~POLL_RDHUP_SCHEDULED;
819 pollRdhupId = 0;
820 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
821 return;
822 }
823
824
825 recvBufAllocHandle().rdHupReceived();
826
827 if (isActive()) {
828 scheduleFirstReadIfNeeded();
829 } else {
830
831 shutdownInput(false);
832 }
833 }
834
835
836
837
838 private void pollIn(int res, int flags, short data) {
839
840 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
841 if (rearm) {
842 ioState &= ~POLL_IN_SCHEDULED;
843 pollInId = 0;
844 }
845 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
846 return;
847 }
848 if (!readPending) {
849
850
851 socketHasMoreData = true;
852 return;
853 }
854 scheduleFirstReadIfNeeded();
855 }
856
857 private void scheduleFirstReadIfNeeded() {
858 if ((ioState & READ_SCHEDULED) == 0) {
859 scheduleFirstRead();
860 }
861 }
862
863 private void scheduleFirstRead() {
864
865 final ChannelConfig config = config();
866 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
867 allocHandle.reset(config);
868 scheduleRead(true);
869 }
870
871 protected final void scheduleRead(boolean first) {
872
873 if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
874 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
875 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
876 ioState |= READ_SCHEDULED;
877 }
878 }
879 }
880
881
882
883
884
885
886
887
888
889
890
891 protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
892
893
894
895
896
897
898 private void pollOut(int res) {
899 ioState &= ~POLL_OUT_SCHEDULED;
900 pollOutId = 0;
901 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
902 return;
903 }
904
905 if (connectPromise != null) {
906
907
908
909 assert eventLoop().inEventLoop();
910
911 boolean connectStillInProgress = false;
912 try {
913 boolean wasActive = isActive();
914 if (!socket.finishConnect()) {
915 connectStillInProgress = true;
916 return;
917 }
918 fulfillConnectPromise(connectPromise, wasActive);
919 } catch (Throwable t) {
920 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
921 } finally {
922 if (!connectStillInProgress) {
923
924
925
926 cancelConnectTimeoutFuture();
927 connectPromise = null;
928 } else {
929
930 schedulePollOut();
931 }
932 }
933 } else if (!socket.isOutputShutdown()) {
934
935 super.flush0();
936 }
937 }
938
939
940
941
942
943
944
945
946
947 private void writeComplete(byte op, int res, int flags, short data) {
948 if ((ioState & CONNECT_SCHEDULED) != 0) {
949
950
951 freeMsgHdrArray();
952 if (res > 0) {
953
954 outboundBuffer().removeBytes(res);
955
956
957 connectComplete(op, 0, flags, data);
958 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
959
960
961
962
963 submitConnect((InetSocketAddress) requestedRemoteAddress);
964 } else {
965
966 connectComplete(op, res, flags, data);
967 }
968 return;
969 }
970
971 if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
972 assert numOutstandingWrites > 0;
973 --numOutstandingWrites;
974 }
975
976 boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
977 if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
978
979
980 schedulePollOut();
981 }
982
983
984
985 if (numOutstandingWrites == 0) {
986 ioState &= ~WRITE_SCHEDULED;
987
988
989 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
990 scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
991 }
992 }
993 }
994
995
996
997
998
999
1000
1001
1002
1003 abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013 void cancelComplete0(byte op, int res, int flags, short data) {
1014
1015 }
1016
1017
1018
1019
1020
1021
1022
1023
1024 void connectComplete(byte op, int res, int flags, short data) {
1025 ioState &= ~CONNECT_SCHEDULED;
1026 freeRemoteAddressMemory();
1027
1028 if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1029
1030 schedulePollOut();
1031 } else {
1032 try {
1033 if (res == 0) {
1034 fulfillConnectPromise(connectPromise, active);
1035 if (readPending) {
1036 doBeginReadNow();
1037 }
1038 } else {
1039 try {
1040 Errors.throwConnectException("io_uring connect", res);
1041 } catch (Throwable cause) {
1042 fulfillConnectPromise(connectPromise, cause);
1043 }
1044 }
1045 } finally {
1046
1047
1048
1049 cancelConnectTimeoutFuture();
1050 connectPromise = null;
1051 }
1052 }
1053 }
1054
1055 @Override
1056 public void connect(
1057 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1058
1059
1060 if (promise.isDone() || !ensureOpen(promise)) {
1061 return;
1062 }
1063
1064 if (delayedClose != null) {
1065 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1066 return;
1067 }
1068 try {
1069 if (connectPromise != null) {
1070 throw new ConnectionPendingException();
1071 }
1072 if (localAddress instanceof InetSocketAddress) {
1073 checkResolvable((InetSocketAddress) localAddress);
1074 }
1075
1076 if (remoteAddress instanceof InetSocketAddress) {
1077 checkResolvable((InetSocketAddress) remoteAddress);
1078 }
1079
1080 if (remote != null) {
1081
1082
1083
1084 throw new AlreadyConnectedException();
1085 }
1086
1087 if (localAddress != null) {
1088 socket.bind(localAddress);
1089 }
1090
1091 if (remoteAddress instanceof InetSocketAddress) {
1092 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1093 ByteBuf initialData = null;
1094 if (IoUring.isTcpFastOpenClientSideAvailable() &&
1095 config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1096 ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1097 outbound.addFlush();
1098 Object curr;
1099 if ((curr = outbound.current()) instanceof ByteBuf) {
1100 initialData = (ByteBuf) curr;
1101 }
1102 }
1103 if (initialData != null) {
1104 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1105 MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1106 hdr.set(socket, inetSocketAddress, IoUring.memoryAddress(initialData),
1107 initialData.readableBytes(), (short) 0);
1108
1109 int fd = fd().intValue();
1110 IoRegistration registration = registration();
1111 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
1112 hdr.address(), hdr.idx());
1113 connectId = registration.submit(ops);
1114 if (connectId == 0) {
1115
1116 freeMsgHdrArray();
1117 }
1118 } else {
1119 submitConnect(inetSocketAddress);
1120 }
1121 } else if (remoteAddress instanceof DomainSocketAddress) {
1122 DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) remoteAddress;
1123 submitConnect(unixDomainSocketAddress);
1124 } else {
1125 throw new Error("Unexpected SocketAddress implementation " + className(remoteAddress));
1126 }
1127
1128 if (connectId != 0) {
1129 ioState |= CONNECT_SCHEDULED;
1130 }
1131 } catch (Throwable t) {
1132 closeIfClosed();
1133 promise.tryFailure(annotateConnectException(t, remoteAddress));
1134 return;
1135 }
1136 connectPromise = promise;
1137 requestedRemoteAddress = remoteAddress;
1138
1139 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1140 if (connectTimeoutMillis > 0) {
1141 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1142 @Override
1143 public void run() {
1144 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1145 if (connectPromise != null && !connectPromise.isDone() &&
1146 connectPromise.tryFailure(new ConnectTimeoutException(
1147 "connection timed out: " + remoteAddress))) {
1148 close(voidPromise());
1149 }
1150 }
1151 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1152 }
1153
1154 promise.addListener(new ChannelFutureListener() {
1155 @Override
1156 public void operationComplete(ChannelFuture future) {
1157
1158
1159 if (future.isCancelled()) {
1160 cancelConnectTimeoutFuture();
1161 connectPromise = null;
1162 close(voidPromise());
1163 }
1164 }
1165 });
1166 }
1167 }
1168
1169 private void submitConnect(InetSocketAddress inetSocketAddress) {
1170 cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1171 remoteAddressMemory = cleanable.buffer();
1172
1173 SockaddrIn.set(socket.isIpv6(), remoteAddressMemory, inetSocketAddress);
1174
1175 int fd = fd().intValue();
1176 IoRegistration registration = registration();
1177 IoUringIoOps ops = IoUringIoOps.newConnect(
1178 fd, (byte) 0, Buffer.memoryAddress(remoteAddressMemory), nextOpsId());
1179 connectId = registration.submit(ops);
1180 if (connectId == 0) {
1181
1182 freeRemoteAddressMemory();
1183 }
1184 }
1185
1186 private void submitConnect(DomainSocketAddress unixDomainSocketAddress) {
1187 cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_UN);
1188 remoteAddressMemory = cleanable.buffer();
1189 int addrLen = SockaddrIn.setUds(remoteAddressMemory, unixDomainSocketAddress);
1190 int fd = fd().intValue();
1191 IoRegistration registration = registration();
1192 long addr = Buffer.memoryAddress(remoteAddressMemory);
1193 IoUringIoOps ops = IoUringIoOps.newConnect(fd, (byte) 0, addr, addrLen, nextOpsId());
1194 connectId = registration.submit(ops);
1195 if (connectId == 0) {
1196
1197 freeRemoteAddressMemory();
1198 }
1199 }
1200
1201 @Override
1202 protected Object filterOutboundMessage(Object msg) {
1203 if (msg instanceof ByteBuf) {
1204 ByteBuf buf = (ByteBuf) msg;
1205 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1206 }
1207 throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
1208 }
1209
1210 @Override
1211 protected void doRegister(ChannelPromise promise) {
1212 IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1213 eventLoop.register(ioUringUnsafe()).addListener(f -> {
1214 if (f.isSuccess()) {
1215 registration = (IoRegistration) f.getNow();
1216 promise.setSuccess();
1217 } else {
1218 promise.setFailure(f.cause());
1219 }
1220 });
1221 }
1222
1223 @Override
1224 protected final void doDeregister() {
1225
1226 ioUringUnsafe().cancelOps(connectPromise != null);
1227 }
1228
1229 @Override
1230 protected void doBind(final SocketAddress local) throws Exception {
1231 if (local instanceof InetSocketAddress) {
1232 checkResolvable((InetSocketAddress) local);
1233 }
1234 socket.bind(local);
1235 this.local = socket.localAddress();
1236 }
1237
1238 protected static void checkResolvable(InetSocketAddress addr) {
1239 if (addr.isUnresolved()) {
1240 throw new UnresolvedAddressException();
1241 }
1242 }
1243
1244 @Override
1245 protected final SocketAddress localAddress0() {
1246 return local;
1247 }
1248
1249 @Override
1250 protected final SocketAddress remoteAddress0() {
1251 return remote;
1252 }
1253
1254 private static boolean isAllowHalfClosure(ChannelConfig config) {
1255 return config instanceof SocketChannelConfig &&
1256 ((SocketChannelConfig) config).isAllowHalfClosure();
1257 }
1258
1259 private void cancelConnectTimeoutFuture() {
1260 if (connectTimeoutFuture != null) {
1261 connectTimeoutFuture.cancel(false);
1262 connectTimeoutFuture = null;
1263 }
1264 }
1265
1266 private void computeRemote() {
1267 if (requestedRemoteAddress instanceof InetSocketAddress) {
1268 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1269 }
1270 }
1271
1272 private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1273 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1274 }
1275
1276
1277
1278
1279
1280
1281
1282 protected abstract boolean socketIsEmpty(int flags);
1283
1284 abstract boolean isPollInFirst();
1285 }