1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufHolder;
21 import io.netty.buffer.ByteBufUtil;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractChannel;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelOption;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.ServerChannel;
38 import io.netty.channel.socket.ChannelInputShutdownEvent;
39 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40 import io.netty.channel.socket.SocketChannelConfig;
41 import io.netty.channel.unix.Buffer;
42 import io.netty.channel.unix.DomainSocketAddress;
43 import io.netty.channel.unix.Errors;
44 import io.netty.channel.unix.FileDescriptor;
45 import io.netty.channel.unix.UnixChannel;
46 import io.netty.channel.unix.UnixChannelUtil;
47 import io.netty.util.ReferenceCountUtil;
48 import io.netty.util.concurrent.PromiseNotifier;
49 import io.netty.util.internal.logging.InternalLogger;
50 import io.netty.util.internal.logging.InternalLoggerFactory;
51
52 import java.io.IOException;
53 import java.net.InetSocketAddress;
54 import java.net.SocketAddress;
55 import java.nio.ByteBuffer;
56 import java.nio.channels.AlreadyConnectedException;
57 import java.nio.channels.ClosedChannelException;
58 import java.nio.channels.ConnectionPendingException;
59 import java.nio.channels.NotYetConnectedException;
60 import java.nio.channels.UnresolvedAddressException;
61 import java.util.concurrent.ScheduledFuture;
62 import java.util.concurrent.TimeUnit;
63
64 import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
65 import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
66 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
67 import static io.netty.util.internal.ObjectUtil.checkNotNull;
68
69
70 abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
71 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
72 final LinuxSocket socket;
73 protected volatile boolean active;
74
75
76 private static final int POLL_IN_SCHEDULED = 1;
77 private static final int POLL_OUT_SCHEDULED = 1 << 2;
78 private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
79 private static final int WRITE_SCHEDULED = 1 << 4;
80 private static final int READ_SCHEDULED = 1 << 5;
81 private static final int CONNECT_SCHEDULED = 1 << 6;
82
83 private short opsId = Short.MIN_VALUE;
84
85 private long pollInId;
86 private long pollOutId;
87 private long pollRdhupId;
88 private long connectId;
89
90
91 private byte ioState;
92
93
94
95
96 private short numOutstandingWrites;
97
98 private short numOutstandingReads;
99
100 private boolean readPending;
101 private boolean inReadComplete;
102 private boolean socketHasMoreData;
103
104 private static final class DelayedClose {
105 private final ChannelPromise promise;
106 private final Throwable cause;
107 private final ClosedChannelException closeCause;
108
109 DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
110 this.promise = promise;
111 this.cause = cause;
112 this.closeCause = closeCause;
113 }
114 }
115 private DelayedClose delayedClose;
116 private boolean inputClosedSeenErrorOnRead;
117
118
119
120
121 private ChannelPromise connectPromise;
122 private ScheduledFuture<?> connectTimeoutFuture;
123 private SocketAddress requestedRemoteAddress;
124 private ByteBuffer remoteAddressMemory;
125 private MsgHdrMemoryArray msgHdrMemoryArray;
126
127 private IoRegistration registration;
128
129 private volatile SocketAddress local;
130 private volatile SocketAddress remote;
131
132 AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
133 super(parent);
134 this.socket = checkNotNull(socket, "fd");
135
136 if (active) {
137
138
139 this.active = true;
140 this.local = socket.localAddress();
141 this.remote = socket.remoteAddress();
142 }
143
144 logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
145 }
146
147 AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
148 super(parent);
149 this.socket = checkNotNull(fd, "fd");
150 this.active = true;
151
152
153
154 this.remote = remote;
155 this.local = fd.localAddress();
156 }
157
158
159 final void autoReadCleared() {
160 if (!isRegistered()) {
161 return;
162 }
163 readPending = false;
164 IoRegistration registration = this.registration;
165 if (registration == null || !registration.isValid()) {
166 return;
167 }
168 if (eventLoop().inEventLoop()) {
169 clearRead();
170 } else {
171 eventLoop().execute(this::clearRead);
172 }
173 }
174
175 private void clearRead() {
176 assert eventLoop().inEventLoop();
177 readPending = false;
178 IoRegistration registration = this.registration;
179 if (registration == null || !registration.isValid()) {
180 return;
181 }
182 if ((ioState & POLL_IN_SCHEDULED) != 0) {
183
184 assert pollInId != 0;
185 long id = registration.submit(
186 IoUringIoOps.newAsyncCancel((byte) 0, pollInId, Native.IORING_OP_POLL_ADD));
187 assert id != 0;
188 ioState &= ~POLL_IN_SCHEDULED;
189 }
190
191 cancelOutstandingReads(registration(), numOutstandingReads);
192 }
193
194
195
196
197
198
199 protected final short nextOpsId() {
200 short id = opsId++;
201
202
203 if (id == 0) {
204 id = opsId++;
205 }
206 return id;
207 }
208
209 public final boolean isOpen() {
210 return socket.isOpen();
211 }
212
213 @Override
214 public boolean isActive() {
215 return active;
216 }
217
218 @Override
219 public final FileDescriptor fd() {
220 return socket;
221 }
222
223 private AbstractUringUnsafe ioUringUnsafe() {
224 return (AbstractUringUnsafe) unsafe();
225 }
226
227 @Override
228 protected boolean isCompatible(final EventLoop loop) {
229 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
230 }
231
232 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
233 return newDirectBuffer(buf, buf);
234 }
235
236 protected boolean allowMultiShotPollIn() {
237 return IoUring.isPollAddMultishotEnabled();
238 }
239
240 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
241 final int readableBytes = buf.readableBytes();
242 if (readableBytes == 0) {
243 ReferenceCountUtil.release(holder);
244 return Unpooled.EMPTY_BUFFER;
245 }
246
247 final ByteBufAllocator alloc = alloc();
248 if (alloc.isDirectBufferPooled()) {
249 return newDirectBuffer0(holder, buf, alloc, readableBytes);
250 }
251
252 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
253 if (directBuf == null) {
254 return newDirectBuffer0(holder, buf, alloc, readableBytes);
255 }
256
257 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
258 ReferenceCountUtil.safeRelease(holder);
259 return directBuf;
260 }
261
262 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
263 final ByteBuf directBuf = alloc.directBuffer(capacity);
264 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
265 ReferenceCountUtil.safeRelease(holder);
266 return directBuf;
267 }
268
269
270
271
272
273
274
275 protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
276
277
278
279
280
281
282
283 protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
284
285 @Override
286 protected void doDisconnect() throws Exception {
287 }
288
289 private void freeRemoteAddressMemory() {
290 if (remoteAddressMemory != null) {
291 Buffer.free(remoteAddressMemory);
292 remoteAddressMemory = null;
293 }
294 }
295
296 private void freeMsgHdrArray() {
297 if (msgHdrMemoryArray != null) {
298 msgHdrMemoryArray.release();
299 msgHdrMemoryArray = null;
300 }
301 }
302
303 @Override
304 protected void doClose() throws Exception {
305 active = false;
306
307 if (registration != null) {
308 if (socket.markClosed()) {
309 int fd = fd().intValue();
310 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
311 registration.submit(ops);
312 }
313 } else {
314
315 socket.close();
316 ioUringUnsafe().freeResourcesNowIfNeeded(null);
317 }
318 }
319
320 @Override
321 protected final void doBeginRead() {
322 if (inputClosedSeenErrorOnRead) {
323
324 return;
325 }
326 if (readPending) {
327
328 return;
329 }
330 readPending = true;
331 if (inReadComplete || !isActive()) {
332
333
334
335 return;
336 }
337 doBeginReadNow();
338 }
339
340 private void doBeginReadNow() {
341 if (inputClosedSeenErrorOnRead) {
342
343 return;
344 }
345 if (!isPollInFirst() ||
346
347
348 socketHasMoreData) {
349
350 ioUringUnsafe().scheduleFirstReadIfNeeded();
351 } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
352 ioUringUnsafe().schedulePollIn();
353 }
354 }
355
356 @Override
357 protected void doWrite(ChannelOutboundBuffer in) {
358 scheduleWriteIfNeeded(in, true);
359 }
360
361 protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
362 if ((ioState & WRITE_SCHEDULED) != 0) {
363 return;
364 }
365 if (scheduleWrite(in) > 0) {
366 ioState |= WRITE_SCHEDULED;
367 if (submitAndRunNow && !isWritable()) {
368 submitAndRunNow();
369 }
370 }
371 }
372
373 protected void submitAndRunNow() {
374
375 }
376
377 private int scheduleWrite(ChannelOutboundBuffer in) {
378 if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
379 return 0;
380 }
381 if (in == null) {
382 return 0;
383 }
384
385 int msgCount = in.size();
386 if (msgCount == 0) {
387 return 0;
388 }
389 Object msg = in.current();
390
391 if (msgCount > 1 && in.current() instanceof ByteBuf) {
392 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
393 } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
394 (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
395
396 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
397 } else {
398 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
399 }
400
401 assert numOutstandingWrites > 0;
402 return numOutstandingWrites;
403 }
404
405 protected final IoRegistration registration() {
406 assert registration != null;
407 return registration;
408 }
409
410 private void schedulePollOut() {
411 pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
412 }
413
414 final void schedulePollRdHup() {
415 pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
416 }
417
418 private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
419 assert (ioState & ioMask) == 0;
420 int fd = fd().intValue();
421 IoRegistration registration = registration();
422 IoUringIoOps ops = IoUringIoOps.newPollAdd(
423 fd, (byte) 0, mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
424 long id = registration.submit(ops);
425 if (id != 0) {
426 ioState |= (byte) ioMask;
427 }
428 return id;
429 }
430
431 final void resetCachedAddresses() {
432 local = socket.localAddress();
433 remote = socket.remoteAddress();
434 }
435
436 protected abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
437 private IoUringRecvByteAllocatorHandle allocHandle;
438 private boolean closed;
439 private boolean freed;
440 private boolean socketIsEmpty;
441
442
443
444
445
446 protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
447
448
449
450
451
452 protected abstract int scheduleWriteSingle(Object msg);
453
454 @Override
455 public final void handle(IoRegistration registration, IoEvent ioEvent) {
456 IoUringIoEvent event = (IoUringIoEvent) ioEvent;
457 byte op = event.opcode();
458 int res = event.res();
459 int flags = event.flags();
460 short data = event.data();
461 switch (op) {
462 case Native.IORING_OP_RECV:
463 case Native.IORING_OP_ACCEPT:
464 case Native.IORING_OP_RECVMSG:
465 case Native.IORING_OP_READ:
466 readComplete(op, res, flags, data);
467 break;
468 case Native.IORING_OP_WRITEV:
469 case Native.IORING_OP_SEND:
470 case Native.IORING_OP_SENDMSG:
471 case Native.IORING_OP_WRITE:
472 case Native.IORING_OP_SPLICE:
473 writeComplete(op, res, flags, data);
474 break;
475 case Native.IORING_OP_POLL_ADD:
476 pollAddComplete(res, flags, data);
477 break;
478 case Native.IORING_OP_ASYNC_CANCEL:
479 cancelComplete0(op, res, flags, data);
480 break;
481 case Native.IORING_OP_CONNECT:
482 connectComplete(op, res, flags, data);
483
484
485 freeMsgHdrArray();
486 freeRemoteAddressMemory();
487 break;
488 case Native.IORING_OP_CLOSE:
489 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
490 if (delayedClose != null) {
491 delayedClose.promise.setSuccess();
492 }
493 closed = true;
494 }
495 break;
496 default:
497 break;
498 }
499
500
501
502 handleDelayedClosed();
503
504 if (ioState == 0 && closed) {
505 freeResourcesNowIfNeeded(registration);
506 }
507 }
508
509 private void freeResourcesNowIfNeeded(IoRegistration reg) {
510 if (!freed) {
511 freed = true;
512 freeResourcesNow(reg);
513 }
514 }
515
516
517
518
519
520
521 protected void freeResourcesNow(IoRegistration reg) {
522 freeMsgHdrArray();
523 freeRemoteAddressMemory();
524 if (reg != null) {
525 reg.cancel();
526 }
527 }
528
529 private void handleDelayedClosed() {
530 if (delayedClose != null && canCloseNow()) {
531 closeNow();
532 }
533 }
534
535 private void pollAddComplete(int res, int flags, short data) {
536 if ((res & Native.POLLOUT) != 0) {
537 pollOut(res);
538 }
539 if ((res & Native.POLLIN) != 0) {
540 pollIn(res, flags, data);
541 }
542 if ((res & Native.POLLRDHUP) != 0) {
543 pollRdHup(res);
544 }
545 }
546
547 @Override
548 public final void close() throws Exception {
549 close(voidPromise());
550 }
551
552 @Override
553 protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
554 if (closeFuture().isDone()) {
555
556 safeSetSuccess(promise);
557 return;
558 }
559 if (delayedClose == null) {
560
561
562
563 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
564 } else {
565 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
566 return;
567 }
568
569 boolean cancelConnect = false;
570 try {
571 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
572 if (connectPromise != null) {
573
574 connectPromise.tryFailure(new ClosedChannelException());
575 AbstractIoUringChannel.this.connectPromise = null;
576 cancelConnect = true;
577 }
578
579 cancelConnectTimeoutFuture();
580 } finally {
581
582
583 cancelOps(cancelConnect);
584 }
585
586 if (canCloseNow()) {
587
588 closeNow();
589 }
590 }
591
592 private void cancelOps(boolean cancelConnect) {
593 if (registration == null || !registration.isValid()) {
594 return;
595 }
596 byte flags = (byte) 0;
597 if ((ioState & POLL_RDHUP_SCHEDULED) != 0 && pollRdhupId != 0) {
598 long id = registration.submit(
599 IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
600 assert id != 0;
601 pollRdhupId = 0;
602 }
603 if ((ioState & POLL_IN_SCHEDULED) != 0 && pollInId != 0) {
604 long id = registration.submit(
605 IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
606 assert id != 0;
607 pollInId = 0;
608 }
609 if ((ioState & POLL_OUT_SCHEDULED) != 0 && pollOutId != 0) {
610 long id = registration.submit(
611 IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
612 assert id != 0;
613 pollOutId = 0;
614 }
615 if (cancelConnect && connectId != 0) {
616
617 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
618 assert id != 0;
619 connectId = 0;
620 }
621 cancelOutstandingReads(registration, numOutstandingReads);
622 cancelOutstandingWrites(registration, numOutstandingWrites);
623 }
624
625 private boolean canCloseNow() {
626
627
628 return (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
629 }
630
631 private void closeNow() {
632 super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
633 }
634
635 @Override
636 protected final void flush0() {
637
638
639
640 if ((ioState & POLL_OUT_SCHEDULED) == 0) {
641 super.flush0();
642 }
643 }
644
645 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
646 if (promise == null) {
647
648 return;
649 }
650
651
652 promise.tryFailure(cause);
653 closeIfClosed();
654 }
655
656 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
657 if (promise == null) {
658
659 return;
660 }
661 active = true;
662
663 if (local == null) {
664 local = socket.localAddress();
665 }
666 computeRemote();
667
668
669 schedulePollRdHup();
670
671
672
673 boolean active = isActive();
674
675
676 boolean promiseSet = promise.trySuccess();
677
678
679
680 if (!wasActive && active) {
681 pipeline().fireChannelActive();
682 }
683
684
685 if (!promiseSet) {
686 close(voidPromise());
687 }
688 }
689
690 @Override
691 public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
692 if (allocHandle == null) {
693 allocHandle = new IoUringRecvByteAllocatorHandle(
694 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
695 }
696 return allocHandle;
697 }
698
699 final void shutdownInput(boolean allDataRead) {
700 logger.trace("shutdownInput Fd: {}", fd().intValue());
701 if (!socket.isInputShutdown()) {
702 if (isAllowHalfClosure(config())) {
703 try {
704 socket.shutdown(true, false);
705 } catch (IOException ignored) {
706
707
708 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
709 return;
710 } catch (NotYetConnectedException ignore) {
711
712
713 }
714 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
715 } else {
716
717 inputClosedSeenErrorOnRead = true;
718 close(voidPromise());
719 return;
720 }
721 }
722 if (allDataRead && !inputClosedSeenErrorOnRead) {
723 inputClosedSeenErrorOnRead = true;
724 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
725 }
726 }
727
728 private void fireEventAndClose(Object evt) {
729 pipeline().fireUserEventTriggered(evt);
730 close(voidPromise());
731 }
732
733 final void schedulePollIn() {
734 assert (ioState & POLL_IN_SCHEDULED) == 0;
735 if (!isActive() || shouldBreakIoUringInReady(config())) {
736 return;
737 }
738 pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, allowMultiShotPollIn());
739 }
740
741 private void readComplete(byte op, int res, int flags, short data) {
742 assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
743
744 boolean multishot = numOutstandingReads == -1;
745 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
746 if (rearm) {
747
748
749 ioState &= ~READ_SCHEDULED;
750 }
751 boolean pending = readPending;
752 if (multishot) {
753
754
755 readPending = false;
756 } else if (--numOutstandingReads == 0) {
757
758 readPending = false;
759 ioState &= ~READ_SCHEDULED;
760 }
761 inReadComplete = true;
762 try {
763 socketIsEmpty = socketIsEmpty(flags);
764 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
765 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
766 readComplete0(op, res, flags, data, numOutstandingReads);
767 } finally {
768 try {
769
770 if (recvBufAllocHandle().isReadComplete()) {
771
772 recvBufAllocHandle().reset(config());
773
774
775 if (!multishot) {
776 if (readPending) {
777
778
779 doBeginReadNow();
780 }
781 } else {
782
783
784
785 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
786
787
788
789
790 if (pending) {
791 doBeginReadNow();
792 }
793 } else if (rearm) {
794
795 doBeginReadNow();
796 } else if (!readPending) {
797
798
799 cancelOutstandingReads(registration, numOutstandingReads);
800 }
801 }
802 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
803
804
805
806
807 if (pending) {
808 doBeginReadNow();
809 }
810 } else if (multishot && rearm) {
811
812 doBeginReadNow();
813 }
814 } finally {
815 inReadComplete = false;
816 socketIsEmpty = false;
817 }
818 }
819 }
820
821
822
823
824 protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
825
826
827
828
829 private void pollRdHup(int res) {
830 ioState &= ~POLL_RDHUP_SCHEDULED;
831 pollRdhupId = 0;
832 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
833 return;
834 }
835
836
837 recvBufAllocHandle().rdHupReceived();
838
839 if (isActive()) {
840 scheduleFirstReadIfNeeded();
841 } else {
842
843 shutdownInput(false);
844 }
845 }
846
847
848
849
850 private void pollIn(int res, int flags, short data) {
851
852 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
853 if (rearm) {
854 ioState &= ~POLL_IN_SCHEDULED;
855 pollInId = 0;
856 }
857 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
858 return;
859 }
860 if (!readPending) {
861
862
863 socketHasMoreData = true;
864 return;
865 }
866 scheduleFirstReadIfNeeded();
867 }
868
869 private void scheduleFirstReadIfNeeded() {
870 if ((ioState & READ_SCHEDULED) == 0) {
871 scheduleFirstRead();
872 }
873 }
874
875 private void scheduleFirstRead() {
876
877 final ChannelConfig config = config();
878 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
879 allocHandle.reset(config);
880 scheduleRead(true);
881 }
882
883 protected final void scheduleRead(boolean first) {
884
885 if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
886 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
887 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
888 ioState |= READ_SCHEDULED;
889 }
890 }
891 }
892
893
894
895
896
897
898
899
900
901
902
903 protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
904
905
906
907
908
909
910 private void pollOut(int res) {
911 ioState &= ~POLL_OUT_SCHEDULED;
912 pollOutId = 0;
913 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
914 return;
915 }
916
917 if (connectPromise != null) {
918
919
920
921 assert eventLoop().inEventLoop();
922
923 boolean connectStillInProgress = false;
924 try {
925 boolean wasActive = isActive();
926 if (!socket.finishConnect()) {
927 connectStillInProgress = true;
928 return;
929 }
930 fulfillConnectPromise(connectPromise, wasActive);
931 } catch (Throwable t) {
932 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
933 } finally {
934 if (!connectStillInProgress) {
935
936
937
938 cancelConnectTimeoutFuture();
939 connectPromise = null;
940 } else {
941
942 schedulePollOut();
943 }
944 }
945 } else if (!socket.isOutputShutdown()) {
946
947 super.flush0();
948 }
949 }
950
951
952
953
954
955
956
957
958
959 private void writeComplete(byte op, int res, int flags, short data) {
960 if ((ioState & CONNECT_SCHEDULED) != 0) {
961
962
963 freeMsgHdrArray();
964 if (res > 0) {
965
966 outboundBuffer().removeBytes(res);
967
968
969 connectComplete(op, 0, flags, data);
970 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
971
972
973
974
975 submitConnect((InetSocketAddress) requestedRemoteAddress);
976 } else {
977
978 connectComplete(op, res, flags, data);
979 }
980 return;
981 }
982 assert numOutstandingWrites > 0;
983 --numOutstandingWrites;
984
985 boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
986 if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
987
988
989 schedulePollOut();
990 }
991
992
993
994 if (numOutstandingWrites == 0) {
995 ioState &= ~WRITE_SCHEDULED;
996
997
998 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
999 scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
1000 }
1001 }
1002 }
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012 abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022 void cancelComplete0(byte op, int res, int flags, short data) {
1023
1024 }
1025
1026
1027
1028
1029
1030
1031
1032
1033 void connectComplete(byte op, int res, int flags, short data) {
1034 ioState &= ~CONNECT_SCHEDULED;
1035 freeRemoteAddressMemory();
1036
1037 if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1038
1039 schedulePollOut();
1040 } else {
1041 try {
1042 if (res == 0) {
1043 fulfillConnectPromise(connectPromise, active);
1044 if (readPending) {
1045 doBeginReadNow();
1046 }
1047 } else {
1048 try {
1049 Errors.throwConnectException("io_uring connect", res);
1050 } catch (Throwable cause) {
1051 fulfillConnectPromise(connectPromise, cause);
1052 }
1053 }
1054 } finally {
1055
1056
1057
1058 cancelConnectTimeoutFuture();
1059 connectPromise = null;
1060 }
1061 }
1062 }
1063
1064 @Override
1065 public void connect(
1066 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1067
1068
1069 if (promise.isDone() || !ensureOpen(promise)) {
1070 return;
1071 }
1072
1073 if (delayedClose != null) {
1074 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1075 return;
1076 }
1077 try {
1078 if (connectPromise != null) {
1079 throw new ConnectionPendingException();
1080 }
1081 if (localAddress instanceof InetSocketAddress) {
1082 checkResolvable((InetSocketAddress) localAddress);
1083 }
1084
1085 if (remoteAddress instanceof InetSocketAddress) {
1086 checkResolvable((InetSocketAddress) remoteAddress);
1087 }
1088
1089 if (remote != null) {
1090
1091
1092
1093 throw new AlreadyConnectedException();
1094 }
1095
1096 if (localAddress != null) {
1097 socket.bind(localAddress);
1098 }
1099
1100 if (remoteAddress instanceof InetSocketAddress) {
1101 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1102 ByteBuf initialData = null;
1103 if (IoUring.isTcpFastOpenClientSideAvailable() &&
1104 config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1105 ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1106 outbound.addFlush();
1107 Object curr;
1108 if ((curr = outbound.current()) instanceof ByteBuf) {
1109 initialData = (ByteBuf) curr;
1110 }
1111 }
1112 if (initialData != null) {
1113 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1114 MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1115 hdr.set(socket, inetSocketAddress, IoUring.memoryAddress(initialData),
1116 initialData.readableBytes(), (short) 0);
1117
1118 int fd = fd().intValue();
1119 IoRegistration registration = registration();
1120 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
1121 hdr.address(), hdr.idx());
1122 connectId = registration.submit(ops);
1123 if (connectId == 0) {
1124
1125 freeMsgHdrArray();
1126 }
1127 } else {
1128 submitConnect(inetSocketAddress);
1129 }
1130 } else if (remoteAddress instanceof DomainSocketAddress) {
1131 DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) remoteAddress;
1132 submitConnect(unixDomainSocketAddress);
1133 } else {
1134 throw new Error("Unexpected SocketAddress implementation " + remoteAddress);
1135 }
1136
1137 if (connectId != 0) {
1138 ioState |= CONNECT_SCHEDULED;
1139 }
1140 } catch (Throwable t) {
1141 closeIfClosed();
1142 promise.tryFailure(annotateConnectException(t, remoteAddress));
1143 return;
1144 }
1145 connectPromise = promise;
1146 requestedRemoteAddress = remoteAddress;
1147
1148 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1149 if (connectTimeoutMillis > 0) {
1150 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1151 @Override
1152 public void run() {
1153 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1154 if (connectPromise != null && !connectPromise.isDone() &&
1155 connectPromise.tryFailure(new ConnectTimeoutException(
1156 "connection timed out: " + remoteAddress))) {
1157 close(voidPromise());
1158 }
1159 }
1160 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1161 }
1162
1163 promise.addListener(new ChannelFutureListener() {
1164 @Override
1165 public void operationComplete(ChannelFuture future) {
1166
1167
1168 if (future.isCancelled()) {
1169 cancelConnectTimeoutFuture();
1170 connectPromise = null;
1171 close(voidPromise());
1172 }
1173 }
1174 });
1175 }
1176 }
1177
1178 private void submitConnect(InetSocketAddress inetSocketAddress) {
1179 remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1180
1181 SockaddrIn.set(socket.isIpv6(), remoteAddressMemory, inetSocketAddress);
1182
1183 int fd = fd().intValue();
1184 IoRegistration registration = registration();
1185 IoUringIoOps ops = IoUringIoOps.newConnect(
1186 fd, (byte) 0, Buffer.memoryAddress(remoteAddressMemory), nextOpsId());
1187 connectId = registration.submit(ops);
1188 if (connectId == 0) {
1189
1190 freeRemoteAddressMemory();
1191 }
1192 }
1193
1194 private void submitConnect(DomainSocketAddress unixDomainSocketAddress) {
1195 remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_UN);
1196 SockaddrIn.setUds(remoteAddressMemory, unixDomainSocketAddress);
1197 int fd = fd().intValue();
1198 IoRegistration registration = registration();
1199 long addr = Buffer.memoryAddress(remoteAddressMemory);
1200 IoUringIoOps ops = IoUringIoOps.newConnect(fd, (byte) 0, addr, Native.SIZEOF_SOCKADDR_UN, nextOpsId());
1201 connectId = registration.submit(ops);
1202 if (connectId == 0) {
1203
1204 freeRemoteAddressMemory();
1205 }
1206 }
1207
1208 @Override
1209 protected Object filterOutboundMessage(Object msg) {
1210 if (msg instanceof ByteBuf) {
1211 ByteBuf buf = (ByteBuf) msg;
1212 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1213 }
1214 throw new UnsupportedOperationException("unsupported message type");
1215 }
1216
1217 @Override
1218 protected void doRegister(ChannelPromise promise) {
1219 IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1220 eventLoop.register(ioUringUnsafe()).addListener(f -> {
1221 if (f.isSuccess()) {
1222 registration = (IoRegistration) f.getNow();
1223 promise.setSuccess();
1224 } else {
1225 promise.setFailure(f.cause());
1226 }
1227 });
1228 }
1229
1230 @Override
1231 protected final void doDeregister() {
1232
1233 ioUringUnsafe().cancelOps(connectPromise != null);
1234 }
1235
1236 @Override
1237 protected void doBind(final SocketAddress local) throws Exception {
1238 if (local instanceof InetSocketAddress) {
1239 checkResolvable((InetSocketAddress) local);
1240 }
1241 socket.bind(local);
1242 this.local = socket.localAddress();
1243 }
1244
1245 protected static void checkResolvable(InetSocketAddress addr) {
1246 if (addr.isUnresolved()) {
1247 throw new UnresolvedAddressException();
1248 }
1249 }
1250
1251 @Override
1252 protected final SocketAddress localAddress0() {
1253 return local;
1254 }
1255
1256 @Override
1257 protected final SocketAddress remoteAddress0() {
1258 return remote;
1259 }
1260
1261 private static boolean isAllowHalfClosure(ChannelConfig config) {
1262 return config instanceof SocketChannelConfig &&
1263 ((SocketChannelConfig) config).isAllowHalfClosure();
1264 }
1265
1266 private void cancelConnectTimeoutFuture() {
1267 if (connectTimeoutFuture != null) {
1268 connectTimeoutFuture.cancel(false);
1269 connectTimeoutFuture = null;
1270 }
1271 }
1272
1273 private void computeRemote() {
1274 if (requestedRemoteAddress instanceof InetSocketAddress) {
1275 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1276 }
1277 }
1278
1279 private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1280 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1281 }
1282
1283
1284
1285
1286
1287
1288
1289 protected abstract boolean socketIsEmpty(int flags);
1290
1291 abstract boolean isPollInFirst();
1292 }