1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufHolder;
21 import io.netty.buffer.ByteBufUtil;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractChannel;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelOption;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.ServerChannel;
38 import io.netty.channel.socket.ChannelInputShutdownEvent;
39 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40 import io.netty.channel.socket.SocketChannelConfig;
41 import io.netty.channel.unix.Buffer;
42 import io.netty.channel.unix.DomainSocketAddress;
43 import io.netty.channel.unix.Errors;
44 import io.netty.channel.unix.FileDescriptor;
45 import io.netty.channel.unix.UnixChannel;
46 import io.netty.channel.unix.UnixChannelUtil;
47 import io.netty.util.ReferenceCountUtil;
48 import io.netty.util.concurrent.PromiseNotifier;
49 import io.netty.util.internal.CleanableDirectBuffer;
50 import io.netty.util.internal.logging.InternalLogger;
51 import io.netty.util.internal.logging.InternalLoggerFactory;
52
53 import java.io.IOException;
54 import java.net.InetSocketAddress;
55 import java.net.SocketAddress;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.AlreadyConnectedException;
58 import java.nio.channels.ClosedChannelException;
59 import java.nio.channels.ConnectionPendingException;
60 import java.nio.channels.NotYetConnectedException;
61 import java.nio.channels.UnresolvedAddressException;
62 import java.util.concurrent.ScheduledFuture;
63 import java.util.concurrent.TimeUnit;
64
65 import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
66 import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
67 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
68 import static io.netty.util.internal.ObjectUtil.checkNotNull;
69 import static io.netty.util.internal.StringUtil.className;
70
71
72 abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
73 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
74 final LinuxSocket socket;
75 protected volatile boolean active;
76
77
78 private static final int POLL_IN_SCHEDULED = 1;
79 private static final int POLL_OUT_SCHEDULED = 1 << 2;
80 private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
81 private static final int WRITE_SCHEDULED = 1 << 4;
82 private static final int READ_SCHEDULED = 1 << 5;
83 private static final int CONNECT_SCHEDULED = 1 << 6;
84
85 private short opsId = Short.MIN_VALUE;
86
87 private long pollInId;
88 private long pollOutId;
89 private long pollRdhupId;
90 private long connectId;
91
92
93 private byte ioState;
94
95
96
97
98 private short numOutstandingWrites;
99
100 private short numOutstandingReads;
101
102 private boolean readPending;
103 private boolean inReadComplete;
104 private boolean socketHasMoreData;
105
106 private static final class DelayedClose {
107 private final ChannelPromise promise;
108 private final Throwable cause;
109 private final ClosedChannelException closeCause;
110
111 DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
112 this.promise = promise;
113 this.cause = cause;
114 this.closeCause = closeCause;
115 }
116 }
117 private DelayedClose delayedClose;
118 private boolean inputClosedSeenErrorOnRead;
119
120
121
122
123 private ChannelPromise connectPromise;
124 private ScheduledFuture<?> connectTimeoutFuture;
125 private SocketAddress requestedRemoteAddress;
126 private CleanableDirectBuffer cleanable;
127 private ByteBuffer remoteAddressMemory;
128 private MsgHdrMemoryArray msgHdrMemoryArray;
129
130 private IoRegistration registration;
131
132 private volatile SocketAddress local;
133 private volatile SocketAddress remote;
134
135 AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
136 super(parent);
137 this.socket = checkNotNull(socket, "fd");
138
139 if (active) {
140
141
142 this.active = true;
143 this.local = socket.localAddress();
144 this.remote = socket.remoteAddress();
145 }
146
147 logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
148 }
149
150 AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
151 super(parent);
152 this.socket = checkNotNull(fd, "fd");
153 this.active = true;
154
155
156
157 this.remote = remote;
158 this.local = fd.localAddress();
159 }
160
161
162 final void autoReadCleared() {
163 if (!isRegistered()) {
164 return;
165 }
166 IoRegistration registration = this.registration;
167 if (registration == null || !registration.isValid()) {
168 return;
169 }
170 if (eventLoop().inEventLoop()) {
171 clearRead();
172 } else {
173 eventLoop().execute(this::clearRead);
174 }
175 }
176
177 private void clearRead() {
178 assert eventLoop().inEventLoop();
179 readPending = false;
180 IoRegistration registration = this.registration;
181 if (registration == null || !registration.isValid()) {
182 return;
183 }
184
185 cancelOutstandingReads(registration(), numOutstandingReads);
186 }
187
188
189
190
191
192
193 protected final short nextOpsId() {
194 short id = opsId++;
195
196
197 if (id == 0) {
198 id = opsId++;
199 }
200 return id;
201 }
202
203 public final boolean isOpen() {
204 return socket.isOpen();
205 }
206
207 @Override
208 public boolean isActive() {
209 return active;
210 }
211
212 @Override
213 public final FileDescriptor fd() {
214 return socket;
215 }
216
217 private AbstractUringUnsafe ioUringUnsafe() {
218 return (AbstractUringUnsafe) unsafe();
219 }
220
221 @Override
222 protected boolean isCompatible(final EventLoop loop) {
223 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
224 }
225
226 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
227 return newDirectBuffer(buf, buf);
228 }
229
230 protected boolean allowMultiShotPollIn() {
231 return IoUring.isPollAddMultishotEnabled();
232 }
233
234 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
235 final int readableBytes = buf.readableBytes();
236 if (readableBytes == 0) {
237 ReferenceCountUtil.release(holder);
238 return Unpooled.EMPTY_BUFFER;
239 }
240
241 final ByteBufAllocator alloc = alloc();
242 if (alloc.isDirectBufferPooled()) {
243 return newDirectBuffer0(holder, buf, alloc, readableBytes);
244 }
245
246 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
247 if (directBuf == null) {
248 return newDirectBuffer0(holder, buf, alloc, readableBytes);
249 }
250
251 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
252 ReferenceCountUtil.safeRelease(holder);
253 return directBuf;
254 }
255
256 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
257 final ByteBuf directBuf = alloc.directBuffer(capacity);
258 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
259 ReferenceCountUtil.safeRelease(holder);
260 return directBuf;
261 }
262
263
264
265
266
267
268
269 protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
270
271
272
273
274
275
276
277 protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
278
279 @Override
280 protected void doDisconnect() throws Exception {
281 }
282
283 private void freeRemoteAddressMemory() {
284 if (remoteAddressMemory != null) {
285 cleanable.clean();
286 cleanable = null;
287 remoteAddressMemory = null;
288 }
289 }
290
291 private void freeMsgHdrArray() {
292 if (msgHdrMemoryArray != null) {
293 msgHdrMemoryArray.release();
294 msgHdrMemoryArray = null;
295 }
296 }
297
298 @Override
299 protected void doClose() throws Exception {
300 active = false;
301
302 if (registration != null) {
303 if (socket.markClosed()) {
304 int fd = fd().intValue();
305 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
306 registration.submit(ops);
307 }
308 } else {
309
310 socket.close();
311 ioUringUnsafe().unregistered();
312 }
313 }
314
315 @Override
316 protected final void doBeginRead() {
317 if (inputClosedSeenErrorOnRead) {
318
319 return;
320 }
321 if (readPending) {
322
323 return;
324 }
325 readPending = true;
326 if (inReadComplete || !isActive()) {
327
328
329
330 return;
331 }
332 doBeginReadNow();
333 }
334
335 private void doBeginReadNow() {
336 if (inputClosedSeenErrorOnRead) {
337
338 return;
339 }
340 if (!isPollInFirst() ||
341
342
343 socketHasMoreData) {
344
345 ioUringUnsafe().scheduleFirstReadIfNeeded();
346 } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
347 ioUringUnsafe().schedulePollIn();
348 }
349 }
350
351 @Override
352 protected void doWrite(ChannelOutboundBuffer in) {
353 scheduleWriteIfNeeded(in, true);
354 }
355
356 protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
357 if ((ioState & WRITE_SCHEDULED) != 0) {
358 return;
359 }
360 if (scheduleWrite(in) > 0) {
361 ioState |= WRITE_SCHEDULED;
362 if (submitAndRunNow && !isWritable()) {
363 submitAndRunNow();
364 }
365 }
366 }
367
368 protected void submitAndRunNow() {
369
370 }
371
372 private int scheduleWrite(ChannelOutboundBuffer in) {
373 if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
374 return 0;
375 }
376 if (in == null) {
377 return 0;
378 }
379
380 int msgCount = in.size();
381 if (msgCount == 0) {
382 return 0;
383 }
384 Object msg = in.current();
385
386 if (msgCount > 1 && in.current() instanceof ByteBuf) {
387 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
388 } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
389 (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
390
391 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
392 } else {
393 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
394 }
395
396 assert numOutstandingWrites > 0;
397 return numOutstandingWrites;
398 }
399
400 protected final IoRegistration registration() {
401 assert registration != null;
402 return registration;
403 }
404
405 private void schedulePollOut() {
406 pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
407 }
408
409 final void schedulePollRdHup() {
410 pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
411 }
412
413 private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
414 assert (ioState & ioMask) == 0;
415 int fd = fd().intValue();
416 IoRegistration registration = registration();
417 IoUringIoOps ops = IoUringIoOps.newPollAdd(
418 fd, (byte) 0, mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
419 long id = registration.submit(ops);
420 if (id != 0) {
421 ioState |= (byte) ioMask;
422 }
423 return id;
424 }
425
426 final void resetCachedAddresses() {
427 local = socket.localAddress();
428 remote = socket.remoteAddress();
429 }
430
431 protected abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
432 private IoUringRecvByteAllocatorHandle allocHandle;
433 private boolean closed;
434 private boolean socketIsEmpty;
435
436
437
438
439
440 protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
441
442
443
444
445
446 protected abstract int scheduleWriteSingle(Object msg);
447
448 @Override
449 public final void handle(IoRegistration registration, IoEvent ioEvent) {
450 IoUringIoEvent event = (IoUringIoEvent) ioEvent;
451 byte op = event.opcode();
452 int res = event.res();
453 int flags = event.flags();
454 short data = event.data();
455 switch (op) {
456 case Native.IORING_OP_RECV:
457 case Native.IORING_OP_ACCEPT:
458 case Native.IORING_OP_RECVMSG:
459 case Native.IORING_OP_READ:
460 readComplete(op, res, flags, data);
461 break;
462 case Native.IORING_OP_WRITEV:
463 case Native.IORING_OP_SEND:
464 case Native.IORING_OP_SENDMSG:
465 case Native.IORING_OP_WRITE:
466 case Native.IORING_OP_SPLICE:
467 case Native.IORING_OP_SEND_ZC:
468 case Native.IORING_OP_SENDMSG_ZC:
469 writeComplete(op, res, flags, data);
470 break;
471 case Native.IORING_OP_POLL_ADD:
472 pollAddComplete(res, flags, data);
473 break;
474 case Native.IORING_OP_ASYNC_CANCEL:
475 cancelComplete0(op, res, flags, data);
476 break;
477 case Native.IORING_OP_CONNECT:
478 connectComplete(op, res, flags, data);
479
480
481 freeMsgHdrArray();
482 freeRemoteAddressMemory();
483 break;
484 case Native.IORING_OP_CLOSE:
485 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
486 if (delayedClose != null) {
487 delayedClose.promise.setSuccess();
488 }
489 closed = true;
490 }
491 break;
492 default:
493 break;
494 }
495
496
497
498 handleDelayedClosed();
499
500 if (ioState == 0 && closed) {
501
502 registration.cancel();
503 }
504 }
505
506 @Override
507 public void unregistered() {
508 freeMsgHdrArray();
509 freeRemoteAddressMemory();
510 }
511
512 private void handleDelayedClosed() {
513 if (delayedClose != null && canCloseNow()) {
514 closeNow();
515 }
516 }
517
518 private void pollAddComplete(int res, int flags, short data) {
519 if ((res & Native.POLLOUT) != 0) {
520 pollOut(res);
521 }
522 if ((res & Native.POLLIN) != 0) {
523 pollIn(res, flags, data);
524 }
525 if ((res & Native.POLLRDHUP) != 0) {
526 pollRdHup(res);
527 }
528 }
529
530 @Override
531 public final void close() throws Exception {
532 close(voidPromise());
533 }
534
535 @Override
536 protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
537 if (closeFuture().isDone()) {
538
539 safeSetSuccess(promise);
540 return;
541 }
542 if (delayedClose == null) {
543
544
545
546 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
547 } else {
548 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
549 return;
550 }
551
552 boolean cancelConnect = false;
553 try {
554 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
555 if (connectPromise != null) {
556
557 connectPromise.tryFailure(new ClosedChannelException());
558 AbstractIoUringChannel.this.connectPromise = null;
559 cancelConnect = true;
560 }
561
562 cancelConnectTimeoutFuture();
563 } finally {
564
565
566 cancelOps(cancelConnect);
567 }
568
569 if (canCloseNow()) {
570
571 closeNow();
572 }
573 }
574
575 private void cancelOps(boolean cancelConnect) {
576 if (registration == null || !registration.isValid()) {
577 return;
578 }
579 byte flags = (byte) 0;
580 if ((ioState & POLL_RDHUP_SCHEDULED) != 0 && pollRdhupId != 0) {
581 long id = registration.submit(
582 IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
583 assert id != 0;
584 pollRdhupId = 0;
585 }
586 if ((ioState & POLL_IN_SCHEDULED) != 0 && pollInId != 0) {
587 long id = registration.submit(
588 IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
589 assert id != 0;
590 pollInId = 0;
591 }
592 if ((ioState & POLL_OUT_SCHEDULED) != 0 && pollOutId != 0) {
593 long id = registration.submit(
594 IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
595 assert id != 0;
596 pollOutId = 0;
597 }
598 if (cancelConnect && connectId != 0) {
599
600 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
601 assert id != 0;
602 connectId = 0;
603 }
604 cancelOutstandingReads(registration, numOutstandingReads);
605 cancelOutstandingWrites(registration, numOutstandingWrites);
606 }
607
608 private boolean canCloseNow() {
609
610
611 return canCloseNow0() && (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
612 }
613
614 protected boolean canCloseNow0() {
615 return true;
616 }
617
618 private void closeNow() {
619 super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
620 }
621
622 @Override
623 protected final void flush0() {
624
625
626
627 if ((ioState & POLL_OUT_SCHEDULED) == 0) {
628 super.flush0();
629 }
630 }
631
632 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
633 if (promise == null) {
634
635 return;
636 }
637
638
639 promise.tryFailure(cause);
640 closeIfClosed();
641 }
642
643 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
644 if (promise == null) {
645
646 return;
647 }
648 active = true;
649
650 if (local == null) {
651 local = socket.localAddress();
652 }
653 computeRemote();
654
655
656 schedulePollRdHup();
657
658
659
660 boolean active = isActive();
661
662
663 boolean promiseSet = promise.trySuccess();
664
665
666
667 if (!wasActive && active) {
668 pipeline().fireChannelActive();
669 }
670
671
672 if (!promiseSet) {
673 close(voidPromise());
674 }
675 }
676
677 @Override
678 public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
679 if (allocHandle == null) {
680 allocHandle = new IoUringRecvByteAllocatorHandle(
681 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
682 }
683 return allocHandle;
684 }
685
686 final void shutdownInput(boolean allDataRead) {
687 logger.trace("shutdownInput Fd: {}", fd().intValue());
688 if (!socket.isInputShutdown()) {
689 if (isAllowHalfClosure(config())) {
690 try {
691 socket.shutdown(true, false);
692 } catch (IOException ignored) {
693
694
695 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
696 return;
697 } catch (NotYetConnectedException ignore) {
698
699
700 }
701 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
702 } else {
703
704 inputClosedSeenErrorOnRead = true;
705 close(voidPromise());
706 return;
707 }
708 }
709 if (allDataRead && !inputClosedSeenErrorOnRead) {
710 inputClosedSeenErrorOnRead = true;
711 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
712 }
713 }
714
715 private void fireEventAndClose(Object evt) {
716 pipeline().fireUserEventTriggered(evt);
717 close(voidPromise());
718 }
719
720 final void schedulePollIn() {
721 assert (ioState & POLL_IN_SCHEDULED) == 0;
722 if (!isActive() || shouldBreakIoUringInReady(config())) {
723 return;
724 }
725 pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, allowMultiShotPollIn());
726 }
727
728 private void readComplete(byte op, int res, int flags, short data) {
729 assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
730
731 boolean multishot = numOutstandingReads == -1;
732 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
733 if (rearm) {
734
735
736 ioState &= ~READ_SCHEDULED;
737 }
738 boolean pending = readPending;
739 if (multishot) {
740
741
742 readPending = false;
743 } else if (--numOutstandingReads == 0) {
744
745 readPending = false;
746 ioState &= ~READ_SCHEDULED;
747 }
748 inReadComplete = true;
749 try {
750 socketIsEmpty = socketIsEmpty(flags);
751 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
752 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
753 readComplete0(op, res, flags, data, numOutstandingReads);
754 } finally {
755 try {
756
757 if (recvBufAllocHandle().isReadComplete()) {
758
759 recvBufAllocHandle().reset(config());
760
761
762 if (!multishot) {
763 if (readPending) {
764
765
766 doBeginReadNow();
767 }
768 } else {
769
770
771
772 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
773
774
775
776
777 if (pending) {
778 doBeginReadNow();
779 }
780 } else if (rearm) {
781
782 doBeginReadNow();
783 } else if (!readPending) {
784
785
786 cancelOutstandingReads(registration, numOutstandingReads);
787 }
788 }
789 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
790
791
792
793
794 if (pending) {
795 doBeginReadNow();
796 }
797 } else if (multishot && rearm) {
798
799 doBeginReadNow();
800 }
801 } finally {
802 inReadComplete = false;
803 socketIsEmpty = false;
804 }
805 }
806 }
807
808
809
810
811 protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
812
813
814
815
816 private void pollRdHup(int res) {
817 ioState &= ~POLL_RDHUP_SCHEDULED;
818 pollRdhupId = 0;
819 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
820 return;
821 }
822
823
824 recvBufAllocHandle().rdHupReceived();
825
826 if (isActive()) {
827 scheduleFirstReadIfNeeded();
828 } else {
829
830 shutdownInput(false);
831 }
832 }
833
834
835
836
837 private void pollIn(int res, int flags, short data) {
838
839 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
840 if (rearm) {
841 ioState &= ~POLL_IN_SCHEDULED;
842 pollInId = 0;
843 }
844 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
845 return;
846 }
847 if (!readPending) {
848
849
850 socketHasMoreData = true;
851 return;
852 }
853 scheduleFirstReadIfNeeded();
854 }
855
856 private void scheduleFirstReadIfNeeded() {
857 if ((ioState & READ_SCHEDULED) == 0) {
858 scheduleFirstRead();
859 }
860 }
861
862 private void scheduleFirstRead() {
863
864 final ChannelConfig config = config();
865 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
866 allocHandle.reset(config);
867 scheduleRead(true);
868 }
869
870 protected final void scheduleRead(boolean first) {
871
872 if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
873 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
874 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
875 ioState |= READ_SCHEDULED;
876 }
877 }
878 }
879
880
881
882
883
884
885
886
887
888
889
890 protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
891
892
893
894
895
896
897 private void pollOut(int res) {
898 ioState &= ~POLL_OUT_SCHEDULED;
899 pollOutId = 0;
900 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
901 return;
902 }
903
904 if (connectPromise != null) {
905
906
907
908 assert eventLoop().inEventLoop();
909
910 boolean connectStillInProgress = false;
911 try {
912 boolean wasActive = isActive();
913 if (!socket.finishConnect()) {
914 connectStillInProgress = true;
915 return;
916 }
917 fulfillConnectPromise(connectPromise, wasActive);
918 } catch (Throwable t) {
919 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
920 } finally {
921 if (!connectStillInProgress) {
922
923
924
925 cancelConnectTimeoutFuture();
926 connectPromise = null;
927 } else {
928
929 schedulePollOut();
930 }
931 }
932 } else if (!socket.isOutputShutdown()) {
933
934 super.flush0();
935 }
936 }
937
938
939
940
941
942
943
944
945
946 private void writeComplete(byte op, int res, int flags, short data) {
947 if ((ioState & CONNECT_SCHEDULED) != 0) {
948
949
950 freeMsgHdrArray();
951 if (res > 0) {
952
953 outboundBuffer().removeBytes(res);
954
955
956 connectComplete(op, 0, flags, data);
957 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
958
959
960
961
962 submitConnect((InetSocketAddress) requestedRemoteAddress);
963 } else {
964
965 connectComplete(op, res, flags, data);
966 }
967 return;
968 }
969
970 if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
971 assert numOutstandingWrites > 0;
972 --numOutstandingWrites;
973 }
974
975 boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
976 if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
977
978
979 schedulePollOut();
980 }
981
982
983
984 if (numOutstandingWrites == 0) {
985 ioState &= ~WRITE_SCHEDULED;
986
987
988 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
989 scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
990 }
991 }
992 }
993
994
995
996
997
998
999
1000
1001
1002 abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012 void cancelComplete0(byte op, int res, int flags, short data) {
1013
1014 }
1015
1016
1017
1018
1019
1020
1021
1022
1023 void connectComplete(byte op, int res, int flags, short data) {
1024 ioState &= ~CONNECT_SCHEDULED;
1025 freeRemoteAddressMemory();
1026
1027 if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1028
1029 schedulePollOut();
1030 } else {
1031 try {
1032 if (res == 0) {
1033 fulfillConnectPromise(connectPromise, active);
1034 if (readPending) {
1035 doBeginReadNow();
1036 }
1037 } else {
1038 try {
1039 Errors.throwConnectException("io_uring connect", res);
1040 } catch (Throwable cause) {
1041 fulfillConnectPromise(connectPromise, cause);
1042 }
1043 }
1044 } finally {
1045
1046
1047
1048 cancelConnectTimeoutFuture();
1049 connectPromise = null;
1050 }
1051 }
1052 }
1053
1054 @Override
1055 public void connect(
1056 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1057
1058
1059 if (promise.isDone() || !ensureOpen(promise)) {
1060 return;
1061 }
1062
1063 if (delayedClose != null) {
1064 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1065 return;
1066 }
1067 try {
1068 if (connectPromise != null) {
1069 throw new ConnectionPendingException();
1070 }
1071 if (localAddress instanceof InetSocketAddress) {
1072 checkResolvable((InetSocketAddress) localAddress);
1073 }
1074
1075 if (remoteAddress instanceof InetSocketAddress) {
1076 checkResolvable((InetSocketAddress) remoteAddress);
1077 }
1078
1079 if (remote != null) {
1080
1081
1082
1083 throw new AlreadyConnectedException();
1084 }
1085
1086 if (localAddress != null) {
1087 socket.bind(localAddress);
1088 }
1089
1090 if (remoteAddress instanceof InetSocketAddress) {
1091 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1092 ByteBuf initialData = null;
1093 if (IoUring.isTcpFastOpenClientSideAvailable() &&
1094 config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1095 ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1096 outbound.addFlush();
1097 Object curr;
1098 if ((curr = outbound.current()) instanceof ByteBuf) {
1099 initialData = (ByteBuf) curr;
1100 }
1101 }
1102 if (initialData != null) {
1103 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1104 MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1105 hdr.set(socket, inetSocketAddress, IoUring.memoryAddress(initialData),
1106 initialData.readableBytes(), (short) 0);
1107
1108 int fd = fd().intValue();
1109 IoRegistration registration = registration();
1110 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
1111 hdr.address(), hdr.idx());
1112 connectId = registration.submit(ops);
1113 if (connectId == 0) {
1114
1115 freeMsgHdrArray();
1116 }
1117 } else {
1118 submitConnect(inetSocketAddress);
1119 }
1120 } else if (remoteAddress instanceof DomainSocketAddress) {
1121 DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) remoteAddress;
1122 submitConnect(unixDomainSocketAddress);
1123 } else {
1124 throw new Error("Unexpected SocketAddress implementation " + className(remoteAddress));
1125 }
1126
1127 if (connectId != 0) {
1128 ioState |= CONNECT_SCHEDULED;
1129 }
1130 } catch (Throwable t) {
1131 closeIfClosed();
1132 promise.tryFailure(annotateConnectException(t, remoteAddress));
1133 return;
1134 }
1135 connectPromise = promise;
1136 requestedRemoteAddress = remoteAddress;
1137
1138 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1139 if (connectTimeoutMillis > 0) {
1140 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1141 @Override
1142 public void run() {
1143 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1144 if (connectPromise != null && !connectPromise.isDone() &&
1145 connectPromise.tryFailure(new ConnectTimeoutException(
1146 "connection timed out: " + remoteAddress))) {
1147 close(voidPromise());
1148 }
1149 }
1150 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1151 }
1152
1153 promise.addListener(new ChannelFutureListener() {
1154 @Override
1155 public void operationComplete(ChannelFuture future) {
1156
1157
1158 if (future.isCancelled()) {
1159 cancelConnectTimeoutFuture();
1160 connectPromise = null;
1161 close(voidPromise());
1162 }
1163 }
1164 });
1165 }
1166 }
1167
1168 private void submitConnect(InetSocketAddress inetSocketAddress) {
1169 cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1170 remoteAddressMemory = cleanable.buffer();
1171
1172 SockaddrIn.set(socket.isIpv6(), remoteAddressMemory, inetSocketAddress);
1173
1174 int fd = fd().intValue();
1175 IoRegistration registration = registration();
1176 IoUringIoOps ops = IoUringIoOps.newConnect(
1177 fd, (byte) 0, Buffer.memoryAddress(remoteAddressMemory), nextOpsId());
1178 connectId = registration.submit(ops);
1179 if (connectId == 0) {
1180
1181 freeRemoteAddressMemory();
1182 }
1183 }
1184
1185 private void submitConnect(DomainSocketAddress unixDomainSocketAddress) {
1186 cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_UN);
1187 remoteAddressMemory = cleanable.buffer();
1188 SockaddrIn.setUds(remoteAddressMemory, unixDomainSocketAddress);
1189 int fd = fd().intValue();
1190 IoRegistration registration = registration();
1191 long addr = Buffer.memoryAddress(remoteAddressMemory);
1192 IoUringIoOps ops = IoUringIoOps.newConnect(fd, (byte) 0, addr, Native.SIZEOF_SOCKADDR_UN, nextOpsId());
1193 connectId = registration.submit(ops);
1194 if (connectId == 0) {
1195
1196 freeRemoteAddressMemory();
1197 }
1198 }
1199
1200 @Override
1201 protected Object filterOutboundMessage(Object msg) {
1202 if (msg instanceof ByteBuf) {
1203 ByteBuf buf = (ByteBuf) msg;
1204 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1205 }
1206 throw new UnsupportedOperationException("unsupported message type");
1207 }
1208
1209 @Override
1210 protected void doRegister(ChannelPromise promise) {
1211 IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1212 eventLoop.register(ioUringUnsafe()).addListener(f -> {
1213 if (f.isSuccess()) {
1214 registration = (IoRegistration) f.getNow();
1215 promise.setSuccess();
1216 } else {
1217 promise.setFailure(f.cause());
1218 }
1219 });
1220 }
1221
1222 @Override
1223 protected final void doDeregister() {
1224
1225 ioUringUnsafe().cancelOps(connectPromise != null);
1226 }
1227
1228 @Override
1229 protected void doBind(final SocketAddress local) throws Exception {
1230 if (local instanceof InetSocketAddress) {
1231 checkResolvable((InetSocketAddress) local);
1232 }
1233 socket.bind(local);
1234 this.local = socket.localAddress();
1235 }
1236
1237 protected static void checkResolvable(InetSocketAddress addr) {
1238 if (addr.isUnresolved()) {
1239 throw new UnresolvedAddressException();
1240 }
1241 }
1242
1243 @Override
1244 protected final SocketAddress localAddress0() {
1245 return local;
1246 }
1247
1248 @Override
1249 protected final SocketAddress remoteAddress0() {
1250 return remote;
1251 }
1252
1253 private static boolean isAllowHalfClosure(ChannelConfig config) {
1254 return config instanceof SocketChannelConfig &&
1255 ((SocketChannelConfig) config).isAllowHalfClosure();
1256 }
1257
1258 private void cancelConnectTimeoutFuture() {
1259 if (connectTimeoutFuture != null) {
1260 connectTimeoutFuture.cancel(false);
1261 connectTimeoutFuture = null;
1262 }
1263 }
1264
1265 private void computeRemote() {
1266 if (requestedRemoteAddress instanceof InetSocketAddress) {
1267 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1268 }
1269 }
1270
1271 private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1272 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1273 }
1274
1275
1276
1277
1278
1279
1280
1281 protected abstract boolean socketIsEmpty(int flags);
1282
1283 abstract boolean isPollInFirst();
1284 }