1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufHolder;
21 import io.netty.buffer.ByteBufUtil;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractChannel;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelOption;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.ServerChannel;
38 import io.netty.channel.socket.ChannelInputShutdownEvent;
39 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40 import io.netty.channel.socket.SocketChannelConfig;
41 import io.netty.channel.unix.Buffer;
42 import io.netty.channel.unix.Errors;
43 import io.netty.channel.unix.FileDescriptor;
44 import io.netty.channel.unix.UnixChannel;
45 import io.netty.channel.unix.UnixChannelUtil;
46 import io.netty.util.ReferenceCountUtil;
47 import io.netty.util.concurrent.PromiseNotifier;
48 import io.netty.util.internal.logging.InternalLogger;
49 import io.netty.util.internal.logging.InternalLoggerFactory;
50
51 import java.io.IOException;
52 import java.net.InetSocketAddress;
53 import java.net.SocketAddress;
54 import java.nio.ByteBuffer;
55 import java.nio.channels.AlreadyConnectedException;
56 import java.nio.channels.ClosedChannelException;
57 import java.nio.channels.ConnectionPendingException;
58 import java.nio.channels.NotYetConnectedException;
59 import java.nio.channels.UnresolvedAddressException;
60 import java.util.concurrent.ScheduledFuture;
61 import java.util.concurrent.TimeUnit;
62
63 import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
64 import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
65 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
66 import static io.netty.util.internal.ObjectUtil.checkNotNull;
67
68
69 abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
70 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
71 final LinuxSocket socket;
72 protected volatile boolean active;
73
74
75 private static final int POLL_IN_SCHEDULED = 1;
76 private static final int POLL_OUT_SCHEDULED = 1 << 2;
77 private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
78 private static final int WRITE_SCHEDULED = 1 << 4;
79 private static final int READ_SCHEDULED = 1 << 5;
80 private static final int CONNECT_SCHEDULED = 1 << 6;
81
82 private short opsId = Short.MIN_VALUE;
83
84 private long pollInId;
85 private long pollOutId;
86 private long pollRdhupId;
87 private long connectId;
88
89
90 private byte ioState;
91
92
93
94
95 private short numOutstandingWrites;
96
97 private short numOutstandingReads;
98
99 private boolean readPending;
100 private boolean inReadComplete;
101 private boolean socketHasMoreData;
102
103 private static final class DelayedClose {
104 private final ChannelPromise promise;
105 private final Throwable cause;
106 private final ClosedChannelException closeCause;
107
108 DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
109 this.promise = promise;
110 this.cause = cause;
111 this.closeCause = closeCause;
112 }
113 }
114 private DelayedClose delayedClose;
115 private boolean inputClosedSeenErrorOnRead;
116
117
118
119
120 private ChannelPromise connectPromise;
121 private ScheduledFuture<?> connectTimeoutFuture;
122 private SocketAddress requestedRemoteAddress;
123 private ByteBuffer remoteAddressMemory;
124 private MsgHdrMemoryArray msgHdrMemoryArray;
125
126 private IoRegistration registration;
127
128 private volatile SocketAddress local;
129 private volatile SocketAddress remote;
130
131 AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
132 super(parent);
133 this.socket = checkNotNull(socket, "fd");
134
135 if (active) {
136
137
138 this.active = true;
139 this.local = socket.localAddress();
140 this.remote = socket.remoteAddress();
141 }
142
143 logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
144 }
145
146 AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
147 super(parent);
148 this.socket = checkNotNull(fd, "fd");
149 this.active = true;
150
151
152
153 this.remote = remote;
154 this.local = fd.localAddress();
155 }
156
157
158 final void autoReadCleared() {
159 if (!isRegistered()) {
160 return;
161 }
162 readPending = false;
163 IoRegistration registration = this.registration;
164 if (registration == null || !registration.isValid()) {
165 return;
166 }
167 if (eventLoop().inEventLoop()) {
168 clearRead();
169 } else {
170 eventLoop().execute(this::clearRead);
171 }
172 }
173
174 private void clearRead() {
175 assert eventLoop().inEventLoop();
176 readPending = false;
177 IoRegistration registration = this.registration;
178 if (registration == null || !registration.isValid()) {
179 return;
180 }
181 if ((ioState & POLL_IN_SCHEDULED) != 0) {
182
183 assert pollInId != 0;
184 long id = registration.submit(
185 IoUringIoOps.newAsyncCancel((byte) 0, pollInId, Native.IORING_OP_POLL_ADD));
186 assert id != 0;
187 ioState &= ~POLL_IN_SCHEDULED;
188 }
189
190 cancelOutstandingReads(registration(), numOutstandingReads);
191 }
192
193
194
195
196
197
198 protected final short nextOpsId() {
199 short id = opsId++;
200
201
202 if (id == 0) {
203 id = opsId++;
204 }
205 return id;
206 }
207
208 public final boolean isOpen() {
209 return socket.isOpen();
210 }
211
212 @Override
213 public boolean isActive() {
214 return active;
215 }
216
217 @Override
218 public final FileDescriptor fd() {
219 return socket;
220 }
221
222 private AbstractUringUnsafe ioUringUnsafe() {
223 return (AbstractUringUnsafe) unsafe();
224 }
225
226 @Override
227 protected boolean isCompatible(final EventLoop loop) {
228 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
229 }
230
231 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
232 return newDirectBuffer(buf, buf);
233 }
234
235 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
236 final int readableBytes = buf.readableBytes();
237 if (readableBytes == 0) {
238 ReferenceCountUtil.release(holder);
239 return Unpooled.EMPTY_BUFFER;
240 }
241
242 final ByteBufAllocator alloc = alloc();
243 if (alloc.isDirectBufferPooled()) {
244 return newDirectBuffer0(holder, buf, alloc, readableBytes);
245 }
246
247 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
248 if (directBuf == null) {
249 return newDirectBuffer0(holder, buf, alloc, readableBytes);
250 }
251
252 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
253 ReferenceCountUtil.safeRelease(holder);
254 return directBuf;
255 }
256
257 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
258 final ByteBuf directBuf = alloc.directBuffer(capacity);
259 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
260 ReferenceCountUtil.safeRelease(holder);
261 return directBuf;
262 }
263
264
265
266
267
268
269
270 protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
271
272
273
274
275
276
277
278 protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
279
280 @Override
281 protected void doDisconnect() throws Exception {
282 }
283
284 private void freeRemoteAddressMemory() {
285 if (remoteAddressMemory != null) {
286 Buffer.free(remoteAddressMemory);
287 remoteAddressMemory = null;
288 }
289 }
290
291 private void freeMsgHdrArray() {
292 if (msgHdrMemoryArray != null) {
293 msgHdrMemoryArray.release();
294 msgHdrMemoryArray = null;
295 }
296 }
297
298 @Override
299 protected void doClose() throws Exception {
300 active = false;
301
302 if (registration != null) {
303 if (socket.markClosed()) {
304 int fd = fd().intValue();
305 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
306 registration.submit(ops);
307 }
308 } else {
309
310 socket.close();
311 ioUringUnsafe().freeResourcesNowIfNeeded(null);
312 }
313 }
314
315 @Override
316 protected final void doBeginRead() {
317 if (inputClosedSeenErrorOnRead) {
318
319 return;
320 }
321 if (readPending) {
322
323 return;
324 }
325 readPending = true;
326 if (inReadComplete || !isActive()) {
327
328
329
330 return;
331 }
332 doBeginReadNow();
333 }
334
335 private void doBeginReadNow() {
336 if (inputClosedSeenErrorOnRead) {
337
338 return;
339 }
340 if (!isPollInFirst() ||
341
342
343 socketHasMoreData) {
344
345 ioUringUnsafe().scheduleFirstReadIfNeeded();
346 } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
347 ioUringUnsafe().schedulePollIn();
348 }
349 }
350
351 @Override
352 protected void doWrite(ChannelOutboundBuffer in) {
353 scheduleWriteIfNeeded(in, true);
354 }
355
356 protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
357 if ((ioState & WRITE_SCHEDULED) != 0) {
358 return;
359 }
360 if (scheduleWrite(in) > 0) {
361 ioState |= WRITE_SCHEDULED;
362 if (submitAndRunNow && !isWritable()) {
363 submitAndRunNow();
364 }
365 }
366 }
367
368 protected void submitAndRunNow() {
369
370 }
371
372 private int scheduleWrite(ChannelOutboundBuffer in) {
373 if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
374 return 0;
375 }
376 if (in == null) {
377 return 0;
378 }
379
380 int msgCount = in.size();
381 if (msgCount == 0) {
382 return 0;
383 }
384 Object msg = in.current();
385
386 if (msgCount > 1 && in.current() instanceof ByteBuf) {
387 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
388 } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
389 (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
390
391 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
392 } else {
393 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
394 }
395
396 assert numOutstandingWrites > 0;
397 return numOutstandingWrites;
398 }
399
400 protected final IoRegistration registration() {
401 assert registration != null;
402 return registration;
403 }
404
405 private void schedulePollOut() {
406 pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
407 }
408
409 final void schedulePollRdHup() {
410 pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
411 }
412
413 private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
414 assert (ioState & ioMask) == 0;
415 int fd = fd().intValue();
416 IoRegistration registration = registration();
417 IoUringIoOps ops = IoUringIoOps.newPollAdd(
418 fd, (byte) 0, mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
419 long id = registration.submit(ops);
420 if (id != 0) {
421 ioState |= (byte) ioMask;
422 }
423 return id;
424 }
425
426 final void resetCachedAddresses() {
427 local = socket.localAddress();
428 remote = socket.remoteAddress();
429 }
430
431 abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
432 private IoUringRecvByteAllocatorHandle allocHandle;
433 private boolean closed;
434 private boolean freed;
435 private boolean socketIsEmpty;
436
437
438
439
440
441 protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
442
443
444
445
446
447 protected abstract int scheduleWriteSingle(Object msg);
448
449 @Override
450 public final void handle(IoRegistration registration, IoEvent ioEvent) {
451 IoUringIoEvent event = (IoUringIoEvent) ioEvent;
452 byte op = event.opcode();
453 int res = event.res();
454 int flags = event.flags();
455 short data = event.data();
456 switch (op) {
457 case Native.IORING_OP_RECV:
458 case Native.IORING_OP_ACCEPT:
459 case Native.IORING_OP_RECVMSG:
460 case Native.IORING_OP_READ:
461 readComplete(op, res, flags, data);
462 break;
463 case Native.IORING_OP_WRITEV:
464 case Native.IORING_OP_SEND:
465 case Native.IORING_OP_SENDMSG:
466 case Native.IORING_OP_WRITE:
467 case Native.IORING_OP_SPLICE:
468 writeComplete(op, res, flags, data);
469 break;
470 case Native.IORING_OP_POLL_ADD:
471 pollAddComplete(res, flags, data);
472 break;
473 case Native.IORING_OP_ASYNC_CANCEL:
474 cancelComplete0(op, res, flags, data);
475 break;
476 case Native.IORING_OP_CONNECT:
477 connectComplete(op, res, flags, data);
478
479
480 freeMsgHdrArray();
481 freeRemoteAddressMemory();
482 break;
483 case Native.IORING_OP_CLOSE:
484 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
485 if (delayedClose != null) {
486 delayedClose.promise.setSuccess();
487 }
488 closed = true;
489 }
490 break;
491 default:
492 break;
493 }
494
495
496
497 handleDelayedClosed();
498
499 if (ioState == 0 && closed) {
500 freeResourcesNowIfNeeded(registration);
501 }
502 }
503
504 private void freeResourcesNowIfNeeded(IoRegistration reg) {
505 if (!freed) {
506 freed = true;
507 freeResourcesNow(reg);
508 }
509 }
510
511
512
513
514
515
516 protected void freeResourcesNow(IoRegistration reg) {
517 freeMsgHdrArray();
518 freeRemoteAddressMemory();
519 if (reg != null) {
520 reg.cancel();
521 }
522 }
523
524 private void handleDelayedClosed() {
525 if (delayedClose != null && canCloseNow()) {
526 closeNow();
527 }
528 }
529
530 private void pollAddComplete(int res, int flags, short data) {
531 if ((res & Native.POLLOUT) != 0) {
532 pollOut(res);
533 }
534 if ((res & Native.POLLIN) != 0) {
535 pollIn(res, flags, data);
536 }
537 if ((res & Native.POLLRDHUP) != 0) {
538 pollRdHup(res);
539 }
540 }
541
542 @Override
543 public final void close() throws Exception {
544 close(voidPromise());
545 }
546
547 @Override
548 protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
549 if (closeFuture().isDone()) {
550
551 safeSetSuccess(promise);
552 return;
553 }
554 if (delayedClose == null) {
555
556
557
558 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
559 } else {
560 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
561 return;
562 }
563
564 boolean cancelConnect = false;
565 try {
566 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
567 if (connectPromise != null) {
568
569 connectPromise.tryFailure(new ClosedChannelException());
570 AbstractIoUringChannel.this.connectPromise = null;
571 cancelConnect = true;
572 }
573
574 cancelConnectTimeoutFuture();
575 } finally {
576
577
578 cancelOps(cancelConnect);
579 }
580
581 if (canCloseNow()) {
582
583 closeNow();
584 }
585 }
586
587 private void cancelOps(boolean cancelConnect) {
588 if (registration == null || !registration.isValid()) {
589 return;
590 }
591 byte flags = (byte) 0;
592 if ((ioState & POLL_RDHUP_SCHEDULED) != 0 && pollRdhupId != 0) {
593 long id = registration.submit(
594 IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
595 assert id != 0;
596 pollRdhupId = 0;
597 }
598 if ((ioState & POLL_IN_SCHEDULED) != 0 && pollInId != 0) {
599 long id = registration.submit(
600 IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
601 assert id != 0;
602 pollInId = 0;
603 }
604 if ((ioState & POLL_OUT_SCHEDULED) != 0 && pollOutId != 0) {
605 long id = registration.submit(
606 IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
607 assert id != 0;
608 pollOutId = 0;
609 }
610 if (cancelConnect && connectId != 0) {
611
612 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
613 assert id != 0;
614 connectId = 0;
615 }
616 cancelOutstandingReads(registration, numOutstandingReads);
617 cancelOutstandingWrites(registration, numOutstandingWrites);
618 }
619
620 private boolean canCloseNow() {
621
622
623 return (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
624 }
625
626 private void closeNow() {
627 super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
628 }
629
630 @Override
631 protected final void flush0() {
632
633
634
635 if ((ioState & POLL_OUT_SCHEDULED) == 0) {
636 super.flush0();
637 }
638 }
639
640 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
641 if (promise == null) {
642
643 return;
644 }
645
646
647 promise.tryFailure(cause);
648 closeIfClosed();
649 }
650
651 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
652 if (promise == null) {
653
654 return;
655 }
656 active = true;
657
658 if (local == null) {
659 local = socket.localAddress();
660 }
661 computeRemote();
662
663
664 schedulePollRdHup();
665
666
667
668 boolean active = isActive();
669
670
671 boolean promiseSet = promise.trySuccess();
672
673
674
675 if (!wasActive && active) {
676 pipeline().fireChannelActive();
677 }
678
679
680 if (!promiseSet) {
681 close(voidPromise());
682 }
683 }
684
685 @Override
686 public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
687 if (allocHandle == null) {
688 allocHandle = new IoUringRecvByteAllocatorHandle(
689 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
690 }
691 return allocHandle;
692 }
693
694 final void shutdownInput(boolean allDataRead) {
695 logger.trace("shutdownInput Fd: {}", fd().intValue());
696 if (!socket.isInputShutdown()) {
697 if (isAllowHalfClosure(config())) {
698 try {
699 socket.shutdown(true, false);
700 } catch (IOException ignored) {
701
702
703 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
704 return;
705 } catch (NotYetConnectedException ignore) {
706
707
708 }
709 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
710 } else {
711
712 inputClosedSeenErrorOnRead = true;
713 close(voidPromise());
714 return;
715 }
716 }
717 if (allDataRead && !inputClosedSeenErrorOnRead) {
718 inputClosedSeenErrorOnRead = true;
719 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
720 }
721 }
722
723 private void fireEventAndClose(Object evt) {
724 pipeline().fireUserEventTriggered(evt);
725 close(voidPromise());
726 }
727
728 final void schedulePollIn() {
729 assert (ioState & POLL_IN_SCHEDULED) == 0;
730 if (!isActive() || shouldBreakIoUringInReady(config())) {
731 return;
732 }
733 pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, IoUring.isPollAddMultishotEnabled());
734 }
735
736 private void readComplete(byte op, int res, int flags, short data) {
737 assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
738
739 boolean multishot = numOutstandingReads == -1;
740 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
741 if (rearm) {
742
743
744 ioState &= ~READ_SCHEDULED;
745 }
746 boolean pending = readPending;
747 if (multishot) {
748
749
750 readPending = false;
751 } else if (--numOutstandingReads == 0) {
752
753 readPending = false;
754 ioState &= ~READ_SCHEDULED;
755 }
756 inReadComplete = true;
757 try {
758 socketIsEmpty = socketIsEmpty(flags);
759 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
760 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
761 readComplete0(op, res, flags, data, numOutstandingReads);
762 } finally {
763 try {
764
765 if (recvBufAllocHandle().isReadComplete()) {
766
767 recvBufAllocHandle().reset(config());
768
769
770 if (!multishot) {
771 if (readPending) {
772
773
774 doBeginReadNow();
775 }
776 } else {
777
778
779
780 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
781
782
783
784
785 if (pending) {
786 doBeginReadNow();
787 }
788 } else if (rearm) {
789
790 doBeginReadNow();
791 } else if (!readPending) {
792
793
794 cancelOutstandingReads(registration, numOutstandingReads);
795 }
796 }
797 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
798
799
800
801
802 if (pending) {
803 doBeginReadNow();
804 }
805 } else if (multishot && rearm) {
806
807 doBeginReadNow();
808 }
809 } finally {
810 inReadComplete = false;
811 socketIsEmpty = false;
812 }
813 }
814 }
815
816
817
818
819 protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
820
821
822
823
824 private void pollRdHup(int res) {
825 ioState &= ~POLL_RDHUP_SCHEDULED;
826 pollRdhupId = 0;
827 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
828 return;
829 }
830
831
832 recvBufAllocHandle().rdHupReceived();
833
834 if (isActive()) {
835 scheduleFirstReadIfNeeded();
836 } else {
837
838 shutdownInput(false);
839 }
840 }
841
842
843
844
845 private void pollIn(int res, int flags, short data) {
846
847 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
848 if (rearm) {
849 ioState &= ~POLL_IN_SCHEDULED;
850 pollInId = 0;
851 }
852 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
853 return;
854 }
855 if (!readPending) {
856
857
858 socketHasMoreData = true;
859 return;
860 }
861 scheduleFirstReadIfNeeded();
862 }
863
864 private void scheduleFirstReadIfNeeded() {
865 if ((ioState & READ_SCHEDULED) == 0) {
866 scheduleFirstRead();
867 }
868 }
869
870 private void scheduleFirstRead() {
871
872 final ChannelConfig config = config();
873 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
874 allocHandle.reset(config);
875 scheduleRead(true);
876 }
877
878 protected final void scheduleRead(boolean first) {
879
880 if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
881 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
882 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
883 ioState |= READ_SCHEDULED;
884 }
885 }
886 }
887
888
889
890
891
892
893
894
895
896
897
898 protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
899
900
901
902
903
904
905 private void pollOut(int res) {
906 ioState &= ~POLL_OUT_SCHEDULED;
907 pollOutId = 0;
908 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
909 return;
910 }
911
912 if (connectPromise != null) {
913
914
915
916 assert eventLoop().inEventLoop();
917
918 boolean connectStillInProgress = false;
919 try {
920 boolean wasActive = isActive();
921 if (!socket.finishConnect()) {
922 connectStillInProgress = true;
923 return;
924 }
925 fulfillConnectPromise(connectPromise, wasActive);
926 } catch (Throwable t) {
927 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
928 } finally {
929 if (!connectStillInProgress) {
930
931
932
933 cancelConnectTimeoutFuture();
934 connectPromise = null;
935 } else {
936
937 schedulePollOut();
938 }
939 }
940 } else if (!socket.isOutputShutdown()) {
941
942 super.flush0();
943 }
944 }
945
946
947
948
949
950
951
952
953
954 private void writeComplete(byte op, int res, int flags, short data) {
955 if ((ioState & CONNECT_SCHEDULED) != 0) {
956
957
958 freeMsgHdrArray();
959 if (res > 0) {
960
961 outboundBuffer().removeBytes(res);
962
963
964 connectComplete(op, 0, flags, data);
965 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
966
967
968
969
970 submitConnect((InetSocketAddress) requestedRemoteAddress);
971 } else {
972
973 connectComplete(op, res, flags, data);
974 }
975 return;
976 }
977 assert numOutstandingWrites > 0;
978 --numOutstandingWrites;
979
980 boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
981 if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
982
983
984 schedulePollOut();
985 }
986
987
988
989 if (numOutstandingWrites == 0) {
990 ioState &= ~WRITE_SCHEDULED;
991
992
993 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
994 scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
995 }
996 }
997 }
998
999
1000
1001
1002
1003
1004
1005
1006
1007 abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017 void cancelComplete0(byte op, int res, int flags, short data) {
1018
1019 }
1020
1021
1022
1023
1024
1025
1026
1027
1028 void connectComplete(byte op, int res, int flags, short data) {
1029 ioState &= ~CONNECT_SCHEDULED;
1030 freeRemoteAddressMemory();
1031
1032 if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1033
1034 schedulePollOut();
1035 } else {
1036 try {
1037 if (res == 0) {
1038 fulfillConnectPromise(connectPromise, active);
1039 if (readPending) {
1040 doBeginReadNow();
1041 }
1042 } else {
1043 try {
1044 Errors.throwConnectException("io_uring connect", res);
1045 } catch (Throwable cause) {
1046 fulfillConnectPromise(connectPromise, cause);
1047 }
1048 }
1049 } finally {
1050
1051
1052
1053 cancelConnectTimeoutFuture();
1054 connectPromise = null;
1055 }
1056 }
1057 }
1058
1059 @Override
1060 public void connect(
1061 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1062
1063
1064 if (promise.isDone() || !ensureOpen(promise)) {
1065 return;
1066 }
1067
1068 if (delayedClose != null) {
1069 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1070 return;
1071 }
1072 try {
1073 if (connectPromise != null) {
1074 throw new ConnectionPendingException();
1075 }
1076 if (localAddress instanceof InetSocketAddress) {
1077 checkResolvable((InetSocketAddress) localAddress);
1078 }
1079
1080 if (remoteAddress instanceof InetSocketAddress) {
1081 checkResolvable((InetSocketAddress) remoteAddress);
1082 }
1083
1084 if (remote != null) {
1085
1086
1087
1088 throw new AlreadyConnectedException();
1089 }
1090
1091 if (localAddress != null) {
1092 socket.bind(localAddress);
1093 }
1094
1095 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1096
1097 ByteBuf initialData = null;
1098 if (IoUring.isTcpFastOpenClientSideAvailable() &&
1099 config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1100 ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1101 outbound.addFlush();
1102 Object curr;
1103 if ((curr = outbound.current()) instanceof ByteBuf) {
1104 initialData = (ByteBuf) curr;
1105 }
1106 }
1107 if (initialData != null) {
1108 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1109 MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1110 hdr.set(socket, inetSocketAddress, IoUring.memoryAddress(initialData),
1111 initialData.readableBytes(), (short) 0);
1112
1113 int fd = fd().intValue();
1114 IoRegistration registration = registration();
1115 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
1116 hdr.address(), hdr.idx());
1117 connectId = registration.submit(ops);
1118 if (connectId == 0) {
1119
1120 freeMsgHdrArray();
1121 }
1122 } else {
1123 submitConnect(inetSocketAddress);
1124 }
1125 if (connectId != 0) {
1126 ioState |= CONNECT_SCHEDULED;
1127 }
1128 } catch (Throwable t) {
1129 closeIfClosed();
1130 promise.tryFailure(annotateConnectException(t, remoteAddress));
1131 return;
1132 }
1133 connectPromise = promise;
1134 requestedRemoteAddress = remoteAddress;
1135
1136 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1137 if (connectTimeoutMillis > 0) {
1138 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1139 @Override
1140 public void run() {
1141 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1142 if (connectPromise != null && !connectPromise.isDone() &&
1143 connectPromise.tryFailure(new ConnectTimeoutException(
1144 "connection timed out: " + remoteAddress))) {
1145 close(voidPromise());
1146 }
1147 }
1148 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1149 }
1150
1151 promise.addListener(new ChannelFutureListener() {
1152 @Override
1153 public void operationComplete(ChannelFuture future) {
1154
1155
1156 if (future.isCancelled()) {
1157 cancelConnectTimeoutFuture();
1158 connectPromise = null;
1159 close(voidPromise());
1160 }
1161 }
1162 });
1163 }
1164 }
1165
1166 private void submitConnect(InetSocketAddress inetSocketAddress) {
1167 remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1168
1169 SockaddrIn.set(socket.isIpv6(), remoteAddressMemory, inetSocketAddress);
1170
1171 int fd = fd().intValue();
1172 IoRegistration registration = registration();
1173 IoUringIoOps ops = IoUringIoOps.newConnect(
1174 fd, (byte) 0, Buffer.memoryAddress(remoteAddressMemory), nextOpsId());
1175 connectId = registration.submit(ops);
1176 if (connectId == 0) {
1177
1178 freeRemoteAddressMemory();
1179 }
1180 }
1181
1182 @Override
1183 protected Object filterOutboundMessage(Object msg) {
1184 if (msg instanceof ByteBuf) {
1185 ByteBuf buf = (ByteBuf) msg;
1186 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1187 }
1188 throw new UnsupportedOperationException("unsupported message type");
1189 }
1190
1191 @Override
1192 protected void doRegister(ChannelPromise promise) {
1193 IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1194 eventLoop.register(ioUringUnsafe()).addListener(f -> {
1195 if (f.isSuccess()) {
1196 registration = (IoRegistration) f.getNow();
1197 promise.setSuccess();
1198 } else {
1199 promise.setFailure(f.cause());
1200 }
1201 });
1202 }
1203
1204 @Override
1205 protected final void doDeregister() {
1206
1207 ioUringUnsafe().cancelOps(connectPromise != null);
1208 }
1209
1210 @Override
1211 protected void doBind(final SocketAddress local) throws Exception {
1212 if (local instanceof InetSocketAddress) {
1213 checkResolvable((InetSocketAddress) local);
1214 }
1215 socket.bind(local);
1216 this.local = socket.localAddress();
1217 }
1218
1219 protected static void checkResolvable(InetSocketAddress addr) {
1220 if (addr.isUnresolved()) {
1221 throw new UnresolvedAddressException();
1222 }
1223 }
1224
1225 @Override
1226 protected final SocketAddress localAddress0() {
1227 return local;
1228 }
1229
1230 @Override
1231 protected final SocketAddress remoteAddress0() {
1232 return remote;
1233 }
1234
1235 private static boolean isAllowHalfClosure(ChannelConfig config) {
1236 return config instanceof SocketChannelConfig &&
1237 ((SocketChannelConfig) config).isAllowHalfClosure();
1238 }
1239
1240 private void cancelConnectTimeoutFuture() {
1241 if (connectTimeoutFuture != null) {
1242 connectTimeoutFuture.cancel(false);
1243 connectTimeoutFuture = null;
1244 }
1245 }
1246
1247 private void computeRemote() {
1248 if (requestedRemoteAddress instanceof InetSocketAddress) {
1249 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1250 }
1251 }
1252
1253 private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1254 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1255 }
1256
1257
1258
1259
1260
1261
1262
1263 protected abstract boolean socketIsEmpty(int flags);
1264
1265 abstract boolean isPollInFirst();
1266 }