1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufHolder;
21 import io.netty.buffer.ByteBufUtil;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractChannel;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelOption;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.ServerChannel;
38 import io.netty.channel.socket.ChannelInputShutdownEvent;
39 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40 import io.netty.channel.socket.SocketChannelConfig;
41 import io.netty.channel.unix.Buffer;
42 import io.netty.channel.unix.Errors;
43 import io.netty.channel.unix.FileDescriptor;
44 import io.netty.channel.unix.UnixChannel;
45 import io.netty.channel.unix.UnixChannelUtil;
46 import io.netty.util.ReferenceCountUtil;
47 import io.netty.util.concurrent.PromiseNotifier;
48 import io.netty.util.internal.logging.InternalLogger;
49 import io.netty.util.internal.logging.InternalLoggerFactory;
50
51 import java.io.IOException;
52 import java.net.InetSocketAddress;
53 import java.net.SocketAddress;
54 import java.nio.ByteBuffer;
55 import java.nio.channels.AlreadyConnectedException;
56 import java.nio.channels.ClosedChannelException;
57 import java.nio.channels.ConnectionPendingException;
58 import java.nio.channels.NotYetConnectedException;
59 import java.nio.channels.UnresolvedAddressException;
60 import java.util.concurrent.ScheduledFuture;
61 import java.util.concurrent.TimeUnit;
62
63 import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
64 import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
65 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
66 import static io.netty.util.internal.ObjectUtil.checkNotNull;
67
68
69 abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
70 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
71 final LinuxSocket socket;
72 protected volatile boolean active;
73
74
75 private static final int POLL_IN_SCHEDULED = 1;
76 private static final int POLL_OUT_SCHEDULED = 1 << 2;
77 private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
78 private static final int WRITE_SCHEDULED = 1 << 4;
79 private static final int READ_SCHEDULED = 1 << 5;
80 private static final int CONNECT_SCHEDULED = 1 << 6;
81
82 private short opsId = Short.MIN_VALUE;
83
84 private long pollInId;
85 private long pollOutId;
86 private long pollRdhupId;
87 private long connectId;
88
89
90 private byte ioState;
91
92
93
94
95 private short numOutstandingWrites;
96
97 private short numOutstandingReads;
98
99 private boolean readPending;
100 private boolean inReadComplete;
101 private boolean socketHasMoreData;
102
103 private static final class DelayedClose {
104 private final ChannelPromise promise;
105 private final Throwable cause;
106 private final ClosedChannelException closeCause;
107
108 DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
109 this.promise = promise;
110 this.cause = cause;
111 this.closeCause = closeCause;
112 }
113 }
114 private DelayedClose delayedClose;
115 private boolean inputClosedSeenErrorOnRead;
116
117
118
119
120 private ChannelPromise connectPromise;
121 private ScheduledFuture<?> connectTimeoutFuture;
122 private SocketAddress requestedRemoteAddress;
123 private ByteBuffer remoteAddressMemory;
124 private MsgHdrMemoryArray msgHdrMemoryArray;
125
126 private IoRegistration registration;
127
128 private volatile SocketAddress local;
129 private volatile SocketAddress remote;
130
131 AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
132 super(parent);
133 this.socket = checkNotNull(socket, "fd");
134
135 if (active) {
136
137
138 this.active = true;
139 this.local = socket.localAddress();
140 this.remote = socket.remoteAddress();
141 }
142
143 logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
144 }
145
146 AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
147 super(parent);
148 this.socket = checkNotNull(fd, "fd");
149 this.active = true;
150
151
152
153 this.remote = remote;
154 this.local = fd.localAddress();
155 }
156
157
158 final void autoReadCleared() {
159 if (!isRegistered()) {
160 return;
161 }
162 readPending = false;
163 IoRegistration registration = this.registration;
164 if (registration == null || !registration.isValid()) {
165 return;
166 }
167 if (eventLoop().inEventLoop()) {
168 clearRead();
169 } else {
170 eventLoop().execute(this::clearRead);
171 }
172 }
173
174 private void clearRead() {
175 assert eventLoop().inEventLoop();
176 readPending = false;
177 IoRegistration registration = this.registration;
178 if (registration == null || !registration.isValid()) {
179 return;
180 }
181 if ((ioState & POLL_IN_SCHEDULED) != 0) {
182
183 assert pollInId != 0;
184 long id = registration.submit(
185 IoUringIoOps.newAsyncCancel((byte) 0, pollInId, Native.IORING_OP_POLL_ADD));
186 assert id != 0;
187 }
188
189 cancelOutstandingReads(registration(), numOutstandingReads);
190 }
191
192
193
194
195
196
197 protected final short nextOpsId() {
198 short id = opsId++;
199
200
201 if (id == 0) {
202 id = opsId++;
203 }
204 return id;
205 }
206
207 public final boolean isOpen() {
208 return socket.isOpen();
209 }
210
211 @Override
212 public boolean isActive() {
213 return active;
214 }
215
216 @Override
217 public final FileDescriptor fd() {
218 return socket;
219 }
220
221 private AbstractUringUnsafe ioUringUnsafe() {
222 return (AbstractUringUnsafe) unsafe();
223 }
224
225 @Override
226 protected boolean isCompatible(final EventLoop loop) {
227 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
228 }
229
230 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
231 return newDirectBuffer(buf, buf);
232 }
233
234 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
235 final int readableBytes = buf.readableBytes();
236 if (readableBytes == 0) {
237 ReferenceCountUtil.release(holder);
238 return Unpooled.EMPTY_BUFFER;
239 }
240
241 final ByteBufAllocator alloc = alloc();
242 if (alloc.isDirectBufferPooled()) {
243 return newDirectBuffer0(holder, buf, alloc, readableBytes);
244 }
245
246 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
247 if (directBuf == null) {
248 return newDirectBuffer0(holder, buf, alloc, readableBytes);
249 }
250
251 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
252 ReferenceCountUtil.safeRelease(holder);
253 return directBuf;
254 }
255
256 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
257 final ByteBuf directBuf = alloc.directBuffer(capacity);
258 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
259 ReferenceCountUtil.safeRelease(holder);
260 return directBuf;
261 }
262
263
264
265
266
267
268
269 protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
270
271
272
273
274
275
276
277 protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
278
279 @Override
280 protected void doDisconnect() throws Exception {
281 }
282
283 protected byte flags(byte flags) {
284 return flags;
285 }
286
287 private void freeRemoteAddressMemory() {
288 if (remoteAddressMemory != null) {
289 Buffer.free(remoteAddressMemory);
290 remoteAddressMemory = null;
291 }
292 }
293
294 private void freeMsgHdrArray() {
295 if (msgHdrMemoryArray != null) {
296 msgHdrMemoryArray.release();
297 msgHdrMemoryArray = null;
298 }
299 }
300
301 @Override
302 protected void doClose() throws Exception {
303 active = false;
304
305 if (registration != null) {
306 if (socket.markClosed()) {
307 int fd = fd().intValue();
308 IoUringIoOps ops = IoUringIoOps.newClose(fd, flags((byte) 0), nextOpsId());
309 registration.submit(ops);
310 }
311 } else {
312
313 socket.close();
314 ioUringUnsafe().freeResourcesNowIfNeeded(null);
315 }
316 }
317
318 @Override
319 protected final void doBeginRead() {
320 if (inputClosedSeenErrorOnRead) {
321
322 return;
323 }
324 if (readPending) {
325
326 return;
327 }
328 readPending = true;
329 if (inReadComplete || !isActive()) {
330
331
332
333 return;
334 }
335 doBeginReadNow();
336 }
337
338 private void doBeginReadNow() {
339 if (inputClosedSeenErrorOnRead) {
340
341 return;
342 }
343 if (!isPollInFirst() ||
344
345
346 socketHasMoreData) {
347
348 ioUringUnsafe().scheduleFirstReadIfNeeded();
349 } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
350 ioUringUnsafe().schedulePollIn();
351 }
352 }
353
354 @Override
355 protected void doWrite(ChannelOutboundBuffer in) {
356 scheduleWriteIfNeeded(in, true);
357 }
358
359 protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
360 if ((ioState & WRITE_SCHEDULED) != 0) {
361 return;
362 }
363 if (scheduleWrite(in) > 0) {
364 ioState |= WRITE_SCHEDULED;
365 if (submitAndRunNow && !isWritable()) {
366 submitAndRunNow();
367 }
368 }
369 }
370
371 protected void submitAndRunNow() {
372
373 }
374
375 private int scheduleWrite(ChannelOutboundBuffer in) {
376 if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
377 return 0;
378 }
379 if (in == null) {
380 return 0;
381 }
382
383 int msgCount = in.size();
384 if (msgCount == 0) {
385 return 0;
386 }
387 Object msg = in.current();
388
389 if (msgCount > 1 && in.current() instanceof ByteBuf) {
390 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
391 } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
392 (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
393
394 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
395 } else {
396 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
397 }
398
399 assert numOutstandingWrites > 0;
400 return numOutstandingWrites;
401 }
402
403 protected final IoRegistration registration() {
404 assert registration != null;
405 return registration;
406 }
407
408 private void schedulePollOut() {
409 pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
410 }
411
412 final void schedulePollRdHup() {
413 pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
414 }
415
416 private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
417 assert (ioState & ioMask) == 0;
418 int fd = fd().intValue();
419 IoRegistration registration = registration();
420 IoUringIoOps ops = IoUringIoOps.newPollAdd(
421 fd, flags((byte) 0), mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
422 long id = registration.submit(ops);
423 if (id != 0) {
424 ioState |= (byte) ioMask;
425 }
426 return id;
427 }
428
429 final void resetCachedAddresses() {
430 local = socket.localAddress();
431 remote = socket.remoteAddress();
432 }
433
434 abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
435 private IoUringRecvByteAllocatorHandle allocHandle;
436 private boolean closed;
437 private boolean freed;
438 private boolean socketIsEmpty;
439
440
441
442
443
444 protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
445
446
447
448
449
450 protected abstract int scheduleWriteSingle(Object msg);
451
452 @Override
453 public final void handle(IoRegistration registration, IoEvent ioEvent) {
454 IoUringIoEvent event = (IoUringIoEvent) ioEvent;
455 byte op = event.opcode();
456 int res = event.res();
457 int flags = event.flags();
458 short data = event.data();
459 switch (op) {
460 case Native.IORING_OP_RECV:
461 case Native.IORING_OP_ACCEPT:
462 case Native.IORING_OP_RECVMSG:
463 case Native.IORING_OP_READ:
464 readComplete(op, res, flags, data);
465 break;
466 case Native.IORING_OP_WRITEV:
467 case Native.IORING_OP_SEND:
468 case Native.IORING_OP_SENDMSG:
469 case Native.IORING_OP_WRITE:
470 case Native.IORING_OP_SPLICE:
471 writeComplete(op, res, flags, data);
472 break;
473 case Native.IORING_OP_POLL_ADD:
474 pollAddComplete(res, flags, data);
475 break;
476 case Native.IORING_OP_ASYNC_CANCEL:
477 cancelComplete0(op, res, flags, data);
478 break;
479 case Native.IORING_OP_CONNECT:
480 connectComplete(op, res, flags, data);
481
482
483 freeMsgHdrArray();
484 freeRemoteAddressMemory();
485 break;
486 case Native.IORING_OP_CLOSE:
487 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
488 if (delayedClose != null) {
489 delayedClose.promise.setSuccess();
490 }
491 closed = true;
492 }
493 break;
494 default:
495 break;
496 }
497
498
499
500 handleDelayedClosed();
501
502 if (ioState == 0 && closed) {
503 freeResourcesNowIfNeeded(registration);
504 }
505 }
506
507 private void freeResourcesNowIfNeeded(IoRegistration reg) {
508 if (!freed) {
509 freed = true;
510 freeResourcesNow(reg);
511 }
512 }
513
514
515
516
517
518
519 protected void freeResourcesNow(IoRegistration reg) {
520 freeMsgHdrArray();
521 freeRemoteAddressMemory();
522 if (reg != null) {
523 reg.cancel();
524 }
525 }
526
527 private void handleDelayedClosed() {
528 if (delayedClose != null && canCloseNow()) {
529 closeNow();
530 }
531 }
532
533 private void pollAddComplete(int res, int flags, short data) {
534 if ((res & Native.POLLOUT) != 0) {
535 pollOut(res);
536 }
537 if ((res & Native.POLLIN) != 0) {
538 pollIn(res, flags, data);
539 }
540 if ((res & Native.POLLRDHUP) != 0) {
541 pollRdHup(res);
542 }
543 }
544
545 @Override
546 public final void close() throws Exception {
547 close(voidPromise());
548 }
549
550 @Override
551 protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
552 if (closeFuture().isDone()) {
553
554 safeSetSuccess(promise);
555 return;
556 }
557 if (delayedClose == null) {
558
559
560
561 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
562 } else {
563 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
564 return;
565 }
566
567 boolean cancelConnect = false;
568 try {
569 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
570 if (connectPromise != null) {
571
572 connectPromise.tryFailure(new ClosedChannelException());
573 AbstractIoUringChannel.this.connectPromise = null;
574 cancelConnect = true;
575 }
576
577 cancelConnectTimeoutFuture();
578 } finally {
579
580
581 cancelOps(cancelConnect);
582 }
583
584 if (canCloseNow()) {
585
586 closeNow();
587 }
588 }
589
590 private void cancelOps(boolean cancelConnect) {
591 if (registration == null || !registration.isValid()) {
592 return;
593 }
594 byte flags = flags((byte) 0);
595 if ((ioState & POLL_RDHUP_SCHEDULED) != 0) {
596 assert pollRdhupId != 0;
597 long id = registration.submit(
598 IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
599 assert id != 0;
600 }
601 if ((ioState & POLL_IN_SCHEDULED) != 0) {
602 assert pollInId != 0;
603 long id = registration.submit(
604 IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
605 assert id != 0;
606 }
607 if ((ioState & POLL_OUT_SCHEDULED) != 0) {
608 assert pollOutId != 0;
609 long id = registration.submit(
610 IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
611 assert id != 0;
612 }
613 if (cancelConnect && connectId != 0) {
614
615 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
616 assert id != 0;
617 }
618 cancelOutstandingReads(registration, numOutstandingReads);
619 cancelOutstandingWrites(registration, numOutstandingWrites);
620 }
621
622 private boolean canCloseNow() {
623
624
625 return (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
626 }
627
628 private void closeNow() {
629 super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
630 }
631
632 @Override
633 protected final void flush0() {
634
635
636
637 if ((ioState & POLL_OUT_SCHEDULED) == 0) {
638 super.flush0();
639 }
640 }
641
642 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
643 if (promise == null) {
644
645 return;
646 }
647
648
649 promise.tryFailure(cause);
650 closeIfClosed();
651 }
652
653 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
654 if (promise == null) {
655
656 return;
657 }
658 active = true;
659
660 if (local == null) {
661 local = socket.localAddress();
662 }
663 computeRemote();
664
665
666 schedulePollRdHup();
667
668
669
670 boolean active = isActive();
671
672
673 boolean promiseSet = promise.trySuccess();
674
675
676
677 if (!wasActive && active) {
678 pipeline().fireChannelActive();
679 }
680
681
682 if (!promiseSet) {
683 close(voidPromise());
684 }
685 }
686
687 @Override
688 public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
689 if (allocHandle == null) {
690 allocHandle = new IoUringRecvByteAllocatorHandle(
691 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
692 }
693 return allocHandle;
694 }
695
696 final void shutdownInput(boolean allDataRead) {
697 logger.trace("shutdownInput Fd: {}", fd().intValue());
698 if (!socket.isInputShutdown()) {
699 if (isAllowHalfClosure(config())) {
700 try {
701 socket.shutdown(true, false);
702 } catch (IOException ignored) {
703
704
705 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
706 return;
707 } catch (NotYetConnectedException ignore) {
708
709
710 }
711 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
712 } else {
713
714 inputClosedSeenErrorOnRead = true;
715 close(voidPromise());
716 return;
717 }
718 }
719 if (allDataRead && !inputClosedSeenErrorOnRead) {
720 inputClosedSeenErrorOnRead = true;
721 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
722 }
723 }
724
725 private void fireEventAndClose(Object evt) {
726 pipeline().fireUserEventTriggered(evt);
727 close(voidPromise());
728 }
729
730 final void schedulePollIn() {
731 assert (ioState & POLL_IN_SCHEDULED) == 0;
732 if (!isActive() || shouldBreakIoUringInReady(config())) {
733 return;
734 }
735 pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, IoUring.isPollAddMultishotSupported());
736 }
737
738 private void readComplete(byte op, int res, int flags, short data) {
739 assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
740
741 boolean multishot = numOutstandingReads == -1;
742 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
743 if (rearm) {
744
745
746 ioState &= ~READ_SCHEDULED;
747 }
748 boolean pending = readPending;
749 if (multishot) {
750
751
752 readPending = false;
753 } else if (--numOutstandingReads == 0) {
754
755 readPending = false;
756 ioState &= ~READ_SCHEDULED;
757 }
758 inReadComplete = true;
759 try {
760 socketIsEmpty = socketIsEmpty(flags);
761 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
762 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
763 readComplete0(op, res, flags, data, numOutstandingReads);
764 } finally {
765 try {
766
767 if (recvBufAllocHandle().isReadComplete()) {
768
769 recvBufAllocHandle().reset(config());
770
771
772 if (!multishot) {
773 if (readPending) {
774
775
776 doBeginReadNow();
777 }
778 } else {
779
780
781
782 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
783
784
785
786
787 if (pending) {
788 doBeginReadNow();
789 }
790 } else if (rearm) {
791
792 doBeginReadNow();
793 } else if (!readPending) {
794
795
796 cancelOutstandingReads(registration, numOutstandingReads);
797 }
798 }
799 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
800
801
802
803
804 if (pending) {
805 doBeginReadNow();
806 }
807 } else if (multishot && rearm) {
808
809 doBeginReadNow();
810 }
811 } finally {
812 inReadComplete = false;
813 socketIsEmpty = false;
814 }
815 }
816 }
817
818
819
820
821 protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
822
823
824
825
826 private void pollRdHup(int res) {
827 ioState &= ~POLL_RDHUP_SCHEDULED;
828 pollRdhupId = 0;
829 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
830 return;
831 }
832
833
834 recvBufAllocHandle().rdHupReceived();
835
836 if (isActive()) {
837 scheduleFirstReadIfNeeded();
838 } else {
839
840 shutdownInput(false);
841 }
842 }
843
844
845
846
847 private void pollIn(int res, int flags, short data) {
848
849 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
850 if (rearm) {
851 ioState &= ~POLL_IN_SCHEDULED;
852 pollInId = 0;
853 }
854 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
855 return;
856 }
857 if (!readPending) {
858
859
860 socketHasMoreData = true;
861 return;
862 }
863 scheduleFirstReadIfNeeded();
864 }
865
866 private void scheduleFirstReadIfNeeded() {
867 if ((ioState & READ_SCHEDULED) == 0) {
868 scheduleFirstRead();
869 }
870 }
871
872 private void scheduleFirstRead() {
873
874 final ChannelConfig config = config();
875 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
876 allocHandle.reset(config);
877 scheduleRead(true);
878 }
879
880 protected final void scheduleRead(boolean first) {
881
882 if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
883 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
884 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
885 ioState |= READ_SCHEDULED;
886 }
887 }
888 }
889
890
891
892
893
894
895
896
897
898
899
900 protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
901
902
903
904
905
906
907 private void pollOut(int res) {
908 ioState &= ~POLL_OUT_SCHEDULED;
909 pollOutId = 0;
910 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
911 return;
912 }
913
914 if (connectPromise != null) {
915
916
917
918 assert eventLoop().inEventLoop();
919
920 boolean connectStillInProgress = false;
921 try {
922 boolean wasActive = isActive();
923 if (!socket.finishConnect()) {
924 connectStillInProgress = true;
925 return;
926 }
927 fulfillConnectPromise(connectPromise, wasActive);
928 } catch (Throwable t) {
929 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
930 } finally {
931 if (!connectStillInProgress) {
932
933
934
935 cancelConnectTimeoutFuture();
936 connectPromise = null;
937 } else {
938
939 schedulePollOut();
940 }
941 }
942 } else if (!socket.isOutputShutdown()) {
943
944 super.flush0();
945 }
946 }
947
948
949
950
951
952
953
954
955
956 private void writeComplete(byte op, int res, int flags, short data) {
957 if ((ioState & CONNECT_SCHEDULED) != 0) {
958
959
960 freeMsgHdrArray();
961 if (res > 0) {
962
963 outboundBuffer().removeBytes(res);
964
965
966 connectComplete(op, 0, flags, data);
967 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
968
969
970
971
972 submitConnect((InetSocketAddress) requestedRemoteAddress);
973 } else {
974
975 connectComplete(op, res, flags, data);
976 }
977 return;
978 }
979 assert numOutstandingWrites > 0;
980 --numOutstandingWrites;
981
982 boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
983 if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
984
985
986 schedulePollOut();
987 }
988
989
990
991 if (numOutstandingWrites == 0) {
992 ioState &= ~WRITE_SCHEDULED;
993
994
995 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
996 scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
997 }
998 }
999 }
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009 abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019 void cancelComplete0(byte op, int res, int flags, short data) {
1020
1021 }
1022
1023
1024
1025
1026
1027
1028
1029
1030 void connectComplete(byte op, int res, int flags, short data) {
1031 ioState &= ~CONNECT_SCHEDULED;
1032 freeRemoteAddressMemory();
1033
1034 if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1035
1036 schedulePollOut();
1037 } else {
1038 try {
1039 if (res == 0) {
1040 fulfillConnectPromise(connectPromise, active);
1041 if (readPending) {
1042 doBeginReadNow();
1043 }
1044 } else {
1045 try {
1046 Errors.throwConnectException("io_uring connect", res);
1047 } catch (Throwable cause) {
1048 fulfillConnectPromise(connectPromise, cause);
1049 }
1050 }
1051 } finally {
1052
1053
1054
1055 cancelConnectTimeoutFuture();
1056 connectPromise = null;
1057 }
1058 }
1059 }
1060
1061 @Override
1062 public void connect(
1063 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1064
1065
1066 if (promise.isDone() || !ensureOpen(promise)) {
1067 return;
1068 }
1069
1070 if (delayedClose != null) {
1071 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1072 return;
1073 }
1074 try {
1075 if (connectPromise != null) {
1076 throw new ConnectionPendingException();
1077 }
1078 if (localAddress instanceof InetSocketAddress) {
1079 checkResolvable((InetSocketAddress) localAddress);
1080 }
1081
1082 if (remoteAddress instanceof InetSocketAddress) {
1083 checkResolvable((InetSocketAddress) remoteAddress);
1084 }
1085
1086 if (remote != null) {
1087
1088
1089
1090 throw new AlreadyConnectedException();
1091 }
1092
1093 if (localAddress != null) {
1094 socket.bind(localAddress);
1095 }
1096
1097 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1098
1099 ByteBuf initialData = null;
1100 if (IoUring.isTcpFastOpenClientSideAvailable() &&
1101 config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1102 ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1103 outbound.addFlush();
1104 Object curr;
1105 if ((curr = outbound.current()) instanceof ByteBuf) {
1106 initialData = (ByteBuf) curr;
1107 }
1108 }
1109 if (initialData != null) {
1110 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1111 MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1112 hdr.write(socket, inetSocketAddress, initialData.memoryAddress(),
1113 initialData.readableBytes(), (short) 0);
1114
1115 int fd = fd().intValue();
1116 IoRegistration registration = registration();
1117 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, flags((byte) 0), Native.MSG_FASTOPEN,
1118 hdr.address(), hdr.idx());
1119 connectId = registration.submit(ops);
1120 if (connectId == 0) {
1121
1122 freeMsgHdrArray();
1123 }
1124 } else {
1125 submitConnect(inetSocketAddress);
1126 }
1127 if (connectId != 0) {
1128 ioState |= CONNECT_SCHEDULED;
1129 }
1130 } catch (Throwable t) {
1131 closeIfClosed();
1132 promise.tryFailure(annotateConnectException(t, remoteAddress));
1133 return;
1134 }
1135 connectPromise = promise;
1136 requestedRemoteAddress = remoteAddress;
1137
1138 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1139 if (connectTimeoutMillis > 0) {
1140 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1141 @Override
1142 public void run() {
1143 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1144 if (connectPromise != null && !connectPromise.isDone() &&
1145 connectPromise.tryFailure(new ConnectTimeoutException(
1146 "connection timed out: " + remoteAddress))) {
1147 close(voidPromise());
1148 }
1149 }
1150 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1151 }
1152
1153 promise.addListener(new ChannelFutureListener() {
1154 @Override
1155 public void operationComplete(ChannelFuture future) {
1156
1157
1158 if (future.isCancelled()) {
1159 cancelConnectTimeoutFuture();
1160 connectPromise = null;
1161 close(voidPromise());
1162 }
1163 }
1164 });
1165 }
1166 }
1167
1168 private void submitConnect(InetSocketAddress inetSocketAddress) {
1169 remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1170 long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
1171
1172 SockaddrIn.write(socket.isIpv6(), remoteAddressMemoryAddress, inetSocketAddress);
1173
1174 int fd = fd().intValue();
1175 IoRegistration registration = registration();
1176 IoUringIoOps ops = IoUringIoOps.newConnect(
1177 fd, flags((byte) 0), remoteAddressMemoryAddress, nextOpsId());
1178 connectId = registration.submit(ops);
1179 if (connectId == 0) {
1180
1181 freeRemoteAddressMemory();
1182 }
1183 }
1184
1185 @Override
1186 protected Object filterOutboundMessage(Object msg) {
1187 if (msg instanceof ByteBuf) {
1188 ByteBuf buf = (ByteBuf) msg;
1189 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1190 }
1191 throw new UnsupportedOperationException("unsupported message type");
1192 }
1193
1194 @Override
1195 protected void doRegister(ChannelPromise promise) {
1196 IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1197 eventLoop.register(ioUringUnsafe()).addListener(f -> {
1198 if (f.isSuccess()) {
1199 registration = (IoRegistration) f.getNow();
1200 promise.setSuccess();
1201 } else {
1202 promise.setFailure(f.cause());
1203 }
1204 });
1205 }
1206
1207 @Override
1208 protected final void doDeregister() {
1209
1210 ioUringUnsafe().cancelOps(connectPromise != null);
1211 }
1212
1213 @Override
1214 protected void doBind(final SocketAddress local) throws Exception {
1215 if (local instanceof InetSocketAddress) {
1216 checkResolvable((InetSocketAddress) local);
1217 }
1218 socket.bind(local);
1219 this.local = socket.localAddress();
1220 }
1221
1222 protected static void checkResolvable(InetSocketAddress addr) {
1223 if (addr.isUnresolved()) {
1224 throw new UnresolvedAddressException();
1225 }
1226 }
1227
1228 @Override
1229 protected final SocketAddress localAddress0() {
1230 return local;
1231 }
1232
1233 @Override
1234 protected final SocketAddress remoteAddress0() {
1235 return remote;
1236 }
1237
1238 private static boolean isAllowHalfClosure(ChannelConfig config) {
1239 return config instanceof SocketChannelConfig &&
1240 ((SocketChannelConfig) config).isAllowHalfClosure();
1241 }
1242
1243 private void cancelConnectTimeoutFuture() {
1244 if (connectTimeoutFuture != null) {
1245 connectTimeoutFuture.cancel(false);
1246 connectTimeoutFuture = null;
1247 }
1248 }
1249
1250 private void computeRemote() {
1251 if (requestedRemoteAddress instanceof InetSocketAddress) {
1252 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1253 }
1254 }
1255
1256 private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1257 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1258 }
1259
1260
1261
1262
1263
1264
1265
1266 protected abstract boolean socketIsEmpty(int flags);
1267
1268 abstract boolean isPollInFirst();
1269 }