1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufHolder;
21 import io.netty.buffer.ByteBufUtil;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractChannel;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelOption;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.ServerChannel;
38 import io.netty.channel.socket.ChannelInputShutdownEvent;
39 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40 import io.netty.channel.socket.SocketChannelConfig;
41 import io.netty.channel.unix.Buffer;
42 import io.netty.channel.unix.DomainSocketAddress;
43 import io.netty.channel.unix.Errors;
44 import io.netty.channel.unix.FileDescriptor;
45 import io.netty.channel.unix.UnixChannel;
46 import io.netty.channel.unix.UnixChannelUtil;
47 import io.netty.util.ReferenceCountUtil;
48 import io.netty.util.concurrent.PromiseNotifier;
49 import io.netty.util.internal.CleanableDirectBuffer;
50 import io.netty.util.internal.logging.InternalLogger;
51 import io.netty.util.internal.logging.InternalLoggerFactory;
52
53 import java.io.IOException;
54 import java.net.InetSocketAddress;
55 import java.net.SocketAddress;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.AlreadyConnectedException;
58 import java.nio.channels.ClosedChannelException;
59 import java.nio.channels.ConnectionPendingException;
60 import java.nio.channels.NotYetConnectedException;
61 import java.nio.channels.UnresolvedAddressException;
62 import java.util.concurrent.ScheduledFuture;
63 import java.util.concurrent.TimeUnit;
64
65 import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
66 import static io.netty.channel.unix.Errors.ERROR_EALREADY_NEGATIVE;
67 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
68 import static io.netty.util.internal.ObjectUtil.checkNotNull;
69 import static io.netty.util.internal.StringUtil.className;
70
71
72 abstract class AbstractIoUringChannel extends AbstractChannel implements UnixChannel {
73 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringChannel.class);
74 final LinuxSocket socket;
75 protected volatile boolean active;
76
77
78 private static final int POLL_IN_SCHEDULED = 1;
79 private static final int POLL_OUT_SCHEDULED = 1 << 2;
80 private static final int POLL_RDHUP_SCHEDULED = 1 << 3;
81 private static final int WRITE_SCHEDULED = 1 << 4;
82 private static final int READ_SCHEDULED = 1 << 5;
83 private static final int CONNECT_SCHEDULED = 1 << 6;
84
85 private short opsId = Short.MIN_VALUE;
86
87 private long pollInId;
88 private long pollOutId;
89 private long pollRdhupId;
90 private long connectId;
91
92
93 private byte ioState;
94
95
96
97
98 private short numOutstandingWrites;
99
100 private short numOutstandingReads;
101
102 private boolean readPending;
103 private boolean inReadComplete;
104 private boolean socketHasMoreData;
105
106 private static final class DelayedClose {
107 private final ChannelPromise promise;
108 private final Throwable cause;
109 private final ClosedChannelException closeCause;
110
111 DelayedClose(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
112 this.promise = promise;
113 this.cause = cause;
114 this.closeCause = closeCause;
115 }
116 }
117 private DelayedClose delayedClose;
118 private boolean inputClosedSeenErrorOnRead;
119
120
121
122
123 private ChannelPromise connectPromise;
124 private ScheduledFuture<?> connectTimeoutFuture;
125 private SocketAddress requestedRemoteAddress;
126 private CleanableDirectBuffer cleanable;
127 private ByteBuffer remoteAddressMemory;
128 private MsgHdrMemoryArray msgHdrMemoryArray;
129
130 private IoRegistration registration;
131
132 private volatile SocketAddress local;
133 private volatile SocketAddress remote;
134
135 AbstractIoUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
136 super(parent);
137 this.socket = checkNotNull(socket, "fd");
138
139 if (active) {
140
141
142 this.active = true;
143 this.local = socket.localAddress();
144 this.remote = socket.remoteAddress();
145 }
146
147 logger.trace("Create {} Socket: {}", this instanceof ServerChannel ? "Server" : "Channel", socket.intValue());
148 }
149
150 AbstractIoUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
151 super(parent);
152 this.socket = checkNotNull(fd, "fd");
153 this.active = true;
154
155
156
157 this.remote = remote;
158 this.local = fd.localAddress();
159 }
160
161
162 final void autoReadCleared() {
163 if (!isRegistered()) {
164 return;
165 }
166 IoRegistration registration = this.registration;
167 if (registration == null || !registration.isValid()) {
168 return;
169 }
170 if (eventLoop().inEventLoop()) {
171 clearRead();
172 } else {
173 eventLoop().execute(this::clearRead);
174 }
175 }
176
177 private void clearRead() {
178 assert eventLoop().inEventLoop();
179 readPending = false;
180 IoRegistration registration = this.registration;
181 if (registration == null || !registration.isValid()) {
182 return;
183 }
184
185 cancelOutstandingReads(registration(), numOutstandingReads);
186 }
187
188
189
190
191
192
193 protected final short nextOpsId() {
194 short id = opsId++;
195
196
197 if (id == 0) {
198 id = opsId++;
199 }
200 return id;
201 }
202
203 public final boolean isOpen() {
204 return socket.isOpen();
205 }
206
207 @Override
208 public boolean isActive() {
209 return active;
210 }
211
212 @Override
213 public final FileDescriptor fd() {
214 return socket;
215 }
216
217 private AbstractUringUnsafe ioUringUnsafe() {
218 return (AbstractUringUnsafe) unsafe();
219 }
220
221 @Override
222 protected boolean isCompatible(final EventLoop loop) {
223 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractUringUnsafe.class);
224 }
225
226 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
227 return newDirectBuffer(buf, buf);
228 }
229
230 protected boolean allowMultiShotPollIn() {
231 return IoUring.isPollAddMultishotEnabled();
232 }
233
234 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
235 final int readableBytes = buf.readableBytes();
236 if (readableBytes == 0) {
237 ReferenceCountUtil.release(holder);
238 return Unpooled.EMPTY_BUFFER;
239 }
240
241 final ByteBufAllocator alloc = alloc();
242 if (alloc.isDirectBufferPooled()) {
243 return newDirectBuffer0(holder, buf, alloc, readableBytes);
244 }
245
246 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
247 if (directBuf == null) {
248 return newDirectBuffer0(holder, buf, alloc, readableBytes);
249 }
250
251 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
252 ReferenceCountUtil.safeRelease(holder);
253 return directBuf;
254 }
255
256 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
257 final ByteBuf directBuf = alloc.directBuffer(capacity);
258 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
259 ReferenceCountUtil.safeRelease(holder);
260 return directBuf;
261 }
262
263
264
265
266
267
268
269 protected abstract void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads);
270
271
272
273
274
275
276
277 protected abstract void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites);
278
279 @Override
280 protected void doDisconnect() throws Exception {
281 }
282
283 private void freeRemoteAddressMemory() {
284 if (remoteAddressMemory != null) {
285 cleanable.clean();
286 cleanable = null;
287 remoteAddressMemory = null;
288 }
289 }
290
291 private void freeMsgHdrArray() {
292 if (msgHdrMemoryArray != null) {
293 msgHdrMemoryArray.release();
294 msgHdrMemoryArray = null;
295 }
296 }
297
298 @Override
299 protected void doClose() throws Exception {
300 active = false;
301
302 if (registration != null) {
303 if (socket.markClosed()) {
304 int fd = fd().intValue();
305 IoUringIoOps ops = IoUringIoOps.newClose(fd, (byte) 0, nextOpsId());
306 registration.submit(ops);
307 }
308 } else {
309
310 socket.close();
311 ioUringUnsafe().freeResourcesNowIfNeeded(null);
312 }
313 }
314
315 @Override
316 protected final void doBeginRead() {
317 if (inputClosedSeenErrorOnRead) {
318
319 return;
320 }
321 if (readPending) {
322
323 return;
324 }
325 readPending = true;
326 if (inReadComplete || !isActive()) {
327
328
329
330 return;
331 }
332 doBeginReadNow();
333 }
334
335 private void doBeginReadNow() {
336 if (inputClosedSeenErrorOnRead) {
337
338 return;
339 }
340 if (!isPollInFirst() ||
341
342
343 socketHasMoreData) {
344
345 ioUringUnsafe().scheduleFirstReadIfNeeded();
346 } else if ((ioState & POLL_IN_SCHEDULED) == 0) {
347 ioUringUnsafe().schedulePollIn();
348 }
349 }
350
351 @Override
352 protected void doWrite(ChannelOutboundBuffer in) {
353 scheduleWriteIfNeeded(in, true);
354 }
355
356 protected void scheduleWriteIfNeeded(ChannelOutboundBuffer in, boolean submitAndRunNow) {
357 if ((ioState & WRITE_SCHEDULED) != 0) {
358 return;
359 }
360 if (scheduleWrite(in) > 0) {
361 ioState |= WRITE_SCHEDULED;
362 if (submitAndRunNow && !isWritable()) {
363 submitAndRunNow();
364 }
365 }
366 }
367
368 protected void submitAndRunNow() {
369
370 }
371
372 private int scheduleWrite(ChannelOutboundBuffer in) {
373 if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
374 return 0;
375 }
376 if (in == null) {
377 return 0;
378 }
379
380 int msgCount = in.size();
381 if (msgCount == 0) {
382 return 0;
383 }
384 Object msg = in.current();
385
386 if (msgCount > 1 && in.current() instanceof ByteBuf) {
387 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
388 } else if (msg instanceof ByteBuf && ((ByteBuf) msg).nioBufferCount() > 1 ||
389 (msg instanceof ByteBufHolder && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
390
391 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
392 } else {
393 numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
394 }
395
396 assert numOutstandingWrites > 0;
397 return numOutstandingWrites;
398 }
399
400 protected final IoRegistration registration() {
401 assert registration != null;
402 return registration;
403 }
404
405 private void schedulePollOut() {
406 pollOutId = schedulePollAdd(POLL_OUT_SCHEDULED, Native.POLLOUT, false);
407 }
408
409 final void schedulePollRdHup() {
410 pollRdhupId = schedulePollAdd(POLL_RDHUP_SCHEDULED, Native.POLLRDHUP, false);
411 }
412
413 private long schedulePollAdd(int ioMask, int mask, boolean multishot) {
414 assert (ioState & ioMask) == 0;
415 int fd = fd().intValue();
416 IoRegistration registration = registration();
417 IoUringIoOps ops = IoUringIoOps.newPollAdd(
418 fd, (byte) 0, mask, multishot ? Native.IORING_POLL_ADD_MULTI : 0, nextOpsId());
419 long id = registration.submit(ops);
420 if (id != 0) {
421 ioState |= (byte) ioMask;
422 }
423 return id;
424 }
425
426 final void resetCachedAddresses() {
427 local = socket.localAddress();
428 remote = socket.remoteAddress();
429 }
430
431 protected abstract class AbstractUringUnsafe extends AbstractUnsafe implements IoUringIoHandle {
432 private IoUringRecvByteAllocatorHandle allocHandle;
433 private boolean closed;
434 private boolean freed;
435 private boolean socketIsEmpty;
436
437
438
439
440
441 protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
442
443
444
445
446
447 protected abstract int scheduleWriteSingle(Object msg);
448
449 @Override
450 public final void handle(IoRegistration registration, IoEvent ioEvent) {
451 IoUringIoEvent event = (IoUringIoEvent) ioEvent;
452 byte op = event.opcode();
453 int res = event.res();
454 int flags = event.flags();
455 short data = event.data();
456 switch (op) {
457 case Native.IORING_OP_RECV:
458 case Native.IORING_OP_ACCEPT:
459 case Native.IORING_OP_RECVMSG:
460 case Native.IORING_OP_READ:
461 readComplete(op, res, flags, data);
462 break;
463 case Native.IORING_OP_WRITEV:
464 case Native.IORING_OP_SEND:
465 case Native.IORING_OP_SENDMSG:
466 case Native.IORING_OP_WRITE:
467 case Native.IORING_OP_SPLICE:
468 case Native.IORING_OP_SEND_ZC:
469 case Native.IORING_OP_SENDMSG_ZC:
470 writeComplete(op, res, flags, data);
471 break;
472 case Native.IORING_OP_POLL_ADD:
473 pollAddComplete(res, flags, data);
474 break;
475 case Native.IORING_OP_ASYNC_CANCEL:
476 cancelComplete0(op, res, flags, data);
477 break;
478 case Native.IORING_OP_CONNECT:
479 connectComplete(op, res, flags, data);
480
481
482 freeMsgHdrArray();
483 freeRemoteAddressMemory();
484 break;
485 case Native.IORING_OP_CLOSE:
486 if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
487 if (delayedClose != null) {
488 delayedClose.promise.setSuccess();
489 }
490 closed = true;
491 }
492 break;
493 default:
494 break;
495 }
496
497
498
499 handleDelayedClosed();
500
501 if (ioState == 0 && closed) {
502 freeResourcesNowIfNeeded(registration);
503 }
504 }
505
506 private void freeResourcesNowIfNeeded(IoRegistration reg) {
507 if (!freed) {
508 freed = true;
509 freeResourcesNow(reg);
510 }
511 }
512
513
514
515
516
517
518 protected void freeResourcesNow(IoRegistration reg) {
519 freeMsgHdrArray();
520 freeRemoteAddressMemory();
521 if (reg != null) {
522 reg.cancel();
523 }
524 }
525
526 private void handleDelayedClosed() {
527 if (delayedClose != null && canCloseNow()) {
528 closeNow();
529 }
530 }
531
532 private void pollAddComplete(int res, int flags, short data) {
533 if ((res & Native.POLLOUT) != 0) {
534 pollOut(res);
535 }
536 if ((res & Native.POLLIN) != 0) {
537 pollIn(res, flags, data);
538 }
539 if ((res & Native.POLLRDHUP) != 0) {
540 pollRdHup(res);
541 }
542 }
543
544 @Override
545 public final void close() throws Exception {
546 close(voidPromise());
547 }
548
549 @Override
550 protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
551 if (closeFuture().isDone()) {
552
553 safeSetSuccess(promise);
554 return;
555 }
556 if (delayedClose == null) {
557
558
559
560 delayedClose = new DelayedClose(promise.isVoid() ? newPromise() : promise, cause, closeCause);
561 } else {
562 delayedClose.promise.addListener(new PromiseNotifier<>(false, promise));
563 return;
564 }
565
566 boolean cancelConnect = false;
567 try {
568 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
569 if (connectPromise != null) {
570
571 connectPromise.tryFailure(new ClosedChannelException());
572 AbstractIoUringChannel.this.connectPromise = null;
573 cancelConnect = true;
574 }
575
576 cancelConnectTimeoutFuture();
577 } finally {
578
579
580 cancelOps(cancelConnect);
581 }
582
583 if (canCloseNow()) {
584
585 closeNow();
586 }
587 }
588
589 private void cancelOps(boolean cancelConnect) {
590 if (registration == null || !registration.isValid()) {
591 return;
592 }
593 byte flags = (byte) 0;
594 if ((ioState & POLL_RDHUP_SCHEDULED) != 0 && pollRdhupId != 0) {
595 long id = registration.submit(
596 IoUringIoOps.newAsyncCancel(flags, pollRdhupId, Native.IORING_OP_POLL_ADD));
597 assert id != 0;
598 pollRdhupId = 0;
599 }
600 if ((ioState & POLL_IN_SCHEDULED) != 0 && pollInId != 0) {
601 long id = registration.submit(
602 IoUringIoOps.newAsyncCancel(flags, pollInId, Native.IORING_OP_POLL_ADD));
603 assert id != 0;
604 pollInId = 0;
605 }
606 if ((ioState & POLL_OUT_SCHEDULED) != 0 && pollOutId != 0) {
607 long id = registration.submit(
608 IoUringIoOps.newAsyncCancel(flags, pollOutId, Native.IORING_OP_POLL_ADD));
609 assert id != 0;
610 pollOutId = 0;
611 }
612 if (cancelConnect && connectId != 0) {
613
614 long id = registration.submit(IoUringIoOps.newAsyncCancel(flags, connectId, Native.IORING_OP_CONNECT));
615 assert id != 0;
616 connectId = 0;
617 }
618 cancelOutstandingReads(registration, numOutstandingReads);
619 cancelOutstandingWrites(registration, numOutstandingWrites);
620 }
621
622 private boolean canCloseNow() {
623
624
625 return canCloseNow0() && (ioState & (WRITE_SCHEDULED | READ_SCHEDULED)) == 0;
626 }
627
628 protected boolean canCloseNow0() {
629 return true;
630 }
631
632 private void closeNow() {
633 super.close(newPromise(), delayedClose.cause, delayedClose.closeCause);
634 }
635
636 @Override
637 protected final void flush0() {
638
639
640
641 if ((ioState & POLL_OUT_SCHEDULED) == 0) {
642 super.flush0();
643 }
644 }
645
646 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
647 if (promise == null) {
648
649 return;
650 }
651
652
653 promise.tryFailure(cause);
654 closeIfClosed();
655 }
656
657 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
658 if (promise == null) {
659
660 return;
661 }
662 active = true;
663
664 if (local == null) {
665 local = socket.localAddress();
666 }
667 computeRemote();
668
669
670 schedulePollRdHup();
671
672
673
674 boolean active = isActive();
675
676
677 boolean promiseSet = promise.trySuccess();
678
679
680
681 if (!wasActive && active) {
682 pipeline().fireChannelActive();
683 }
684
685
686 if (!promiseSet) {
687 close(voidPromise());
688 }
689 }
690
691 @Override
692 public final IoUringRecvByteAllocatorHandle recvBufAllocHandle() {
693 if (allocHandle == null) {
694 allocHandle = new IoUringRecvByteAllocatorHandle(
695 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
696 }
697 return allocHandle;
698 }
699
700 final void shutdownInput(boolean allDataRead) {
701 logger.trace("shutdownInput Fd: {}", fd().intValue());
702 if (!socket.isInputShutdown()) {
703 if (isAllowHalfClosure(config())) {
704 try {
705 socket.shutdown(true, false);
706 } catch (IOException ignored) {
707
708
709 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
710 return;
711 } catch (NotYetConnectedException ignore) {
712
713
714 }
715 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
716 } else {
717
718 inputClosedSeenErrorOnRead = true;
719 close(voidPromise());
720 return;
721 }
722 }
723 if (allDataRead && !inputClosedSeenErrorOnRead) {
724 inputClosedSeenErrorOnRead = true;
725 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
726 }
727 }
728
729 private void fireEventAndClose(Object evt) {
730 pipeline().fireUserEventTriggered(evt);
731 close(voidPromise());
732 }
733
734 final void schedulePollIn() {
735 assert (ioState & POLL_IN_SCHEDULED) == 0;
736 if (!isActive() || shouldBreakIoUringInReady(config())) {
737 return;
738 }
739 pollInId = schedulePollAdd(POLL_IN_SCHEDULED, Native.POLLIN, allowMultiShotPollIn());
740 }
741
742 private void readComplete(byte op, int res, int flags, short data) {
743 assert numOutstandingReads > 0 || numOutstandingReads == -1 : numOutstandingReads;
744
745 boolean multishot = numOutstandingReads == -1;
746 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
747 if (rearm) {
748
749
750 ioState &= ~READ_SCHEDULED;
751 }
752 boolean pending = readPending;
753 if (multishot) {
754
755
756 readPending = false;
757 } else if (--numOutstandingReads == 0) {
758
759 readPending = false;
760 ioState &= ~READ_SCHEDULED;
761 }
762 inReadComplete = true;
763 try {
764 socketIsEmpty = socketIsEmpty(flags);
765 socketHasMoreData = IoUring.isCqeFSockNonEmptySupported() &&
766 (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) != 0;
767 readComplete0(op, res, flags, data, numOutstandingReads);
768 } finally {
769 try {
770
771 if (recvBufAllocHandle().isReadComplete()) {
772
773 recvBufAllocHandle().reset(config());
774
775
776 if (!multishot) {
777 if (readPending) {
778
779
780 doBeginReadNow();
781 }
782 } else {
783
784
785
786 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
787
788
789
790
791 if (pending) {
792 doBeginReadNow();
793 }
794 } else if (rearm) {
795
796 doBeginReadNow();
797 } else if (!readPending) {
798
799
800 cancelOutstandingReads(registration, numOutstandingReads);
801 }
802 }
803 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
804
805
806
807
808 if (pending) {
809 doBeginReadNow();
810 }
811 } else if (multishot && rearm) {
812
813 doBeginReadNow();
814 }
815 } finally {
816 inReadComplete = false;
817 socketIsEmpty = false;
818 }
819 }
820 }
821
822
823
824
825 protected abstract void readComplete0(byte op, int res, int flags, short data, int outstandingCompletes);
826
827
828
829
830 private void pollRdHup(int res) {
831 ioState &= ~POLL_RDHUP_SCHEDULED;
832 pollRdhupId = 0;
833 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
834 return;
835 }
836
837
838 recvBufAllocHandle().rdHupReceived();
839
840 if (isActive()) {
841 scheduleFirstReadIfNeeded();
842 } else {
843
844 shutdownInput(false);
845 }
846 }
847
848
849
850
851 private void pollIn(int res, int flags, short data) {
852
853 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
854 if (rearm) {
855 ioState &= ~POLL_IN_SCHEDULED;
856 pollInId = 0;
857 }
858 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
859 return;
860 }
861 if (!readPending) {
862
863
864 socketHasMoreData = true;
865 return;
866 }
867 scheduleFirstReadIfNeeded();
868 }
869
870 private void scheduleFirstReadIfNeeded() {
871 if ((ioState & READ_SCHEDULED) == 0) {
872 scheduleFirstRead();
873 }
874 }
875
876 private void scheduleFirstRead() {
877
878 final ChannelConfig config = config();
879 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
880 allocHandle.reset(config);
881 scheduleRead(true);
882 }
883
884 protected final void scheduleRead(boolean first) {
885
886 if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
887 numOutstandingReads = (short) scheduleRead0(first, socketIsEmpty);
888 if (numOutstandingReads > 0 || numOutstandingReads == -1) {
889 ioState |= READ_SCHEDULED;
890 }
891 }
892 }
893
894
895
896
897
898
899
900
901
902
903
904 protected abstract int scheduleRead0(boolean first, boolean socketIsEmpty);
905
906
907
908
909
910
911 private void pollOut(int res) {
912 ioState &= ~POLL_OUT_SCHEDULED;
913 pollOutId = 0;
914 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
915 return;
916 }
917
918 if (connectPromise != null) {
919
920
921
922 assert eventLoop().inEventLoop();
923
924 boolean connectStillInProgress = false;
925 try {
926 boolean wasActive = isActive();
927 if (!socket.finishConnect()) {
928 connectStillInProgress = true;
929 return;
930 }
931 fulfillConnectPromise(connectPromise, wasActive);
932 } catch (Throwable t) {
933 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
934 } finally {
935 if (!connectStillInProgress) {
936
937
938
939 cancelConnectTimeoutFuture();
940 connectPromise = null;
941 } else {
942
943 schedulePollOut();
944 }
945 }
946 } else if (!socket.isOutputShutdown()) {
947
948 super.flush0();
949 }
950 }
951
952
953
954
955
956
957
958
959
960 private void writeComplete(byte op, int res, int flags, short data) {
961 if ((ioState & CONNECT_SCHEDULED) != 0) {
962
963
964 freeMsgHdrArray();
965 if (res > 0) {
966
967 outboundBuffer().removeBytes(res);
968
969
970 connectComplete(op, 0, flags, data);
971 } else if (res == ERRNO_EINPROGRESS_NEGATIVE || res == 0) {
972
973
974
975
976 submitConnect((InetSocketAddress) requestedRemoteAddress);
977 } else {
978
979 connectComplete(op, res, flags, data);
980 }
981 return;
982 }
983
984 if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
985 assert numOutstandingWrites > 0;
986 --numOutstandingWrites;
987 }
988
989 boolean writtenAll = writeComplete0(op, res, flags, data, numOutstandingWrites);
990 if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
991
992
993 schedulePollOut();
994 }
995
996
997
998 if (numOutstandingWrites == 0) {
999 ioState &= ~WRITE_SCHEDULED;
1000
1001
1002 if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
1003 scheduleWriteIfNeeded(unsafe().outboundBuffer(), false);
1004 }
1005 }
1006 }
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016 abstract boolean writeComplete0(byte op, int res, int flags, short data, int outstanding);
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026 void cancelComplete0(byte op, int res, int flags, short data) {
1027
1028 }
1029
1030
1031
1032
1033
1034
1035
1036
1037 void connectComplete(byte op, int res, int flags, short data) {
1038 ioState &= ~CONNECT_SCHEDULED;
1039 freeRemoteAddressMemory();
1040
1041 if (res == ERRNO_EINPROGRESS_NEGATIVE || res == ERROR_EALREADY_NEGATIVE) {
1042
1043 schedulePollOut();
1044 } else {
1045 try {
1046 if (res == 0) {
1047 fulfillConnectPromise(connectPromise, active);
1048 if (readPending) {
1049 doBeginReadNow();
1050 }
1051 } else {
1052 try {
1053 Errors.throwConnectException("io_uring connect", res);
1054 } catch (Throwable cause) {
1055 fulfillConnectPromise(connectPromise, cause);
1056 }
1057 }
1058 } finally {
1059
1060
1061
1062 cancelConnectTimeoutFuture();
1063 connectPromise = null;
1064 }
1065 }
1066 }
1067
1068 @Override
1069 public void connect(
1070 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
1071
1072
1073 if (promise.isDone() || !ensureOpen(promise)) {
1074 return;
1075 }
1076
1077 if (delayedClose != null) {
1078 promise.tryFailure(annotateConnectException(new ClosedChannelException(), remoteAddress));
1079 return;
1080 }
1081 try {
1082 if (connectPromise != null) {
1083 throw new ConnectionPendingException();
1084 }
1085 if (localAddress instanceof InetSocketAddress) {
1086 checkResolvable((InetSocketAddress) localAddress);
1087 }
1088
1089 if (remoteAddress instanceof InetSocketAddress) {
1090 checkResolvable((InetSocketAddress) remoteAddress);
1091 }
1092
1093 if (remote != null) {
1094
1095
1096
1097 throw new AlreadyConnectedException();
1098 }
1099
1100 if (localAddress != null) {
1101 socket.bind(localAddress);
1102 }
1103
1104 if (remoteAddress instanceof InetSocketAddress) {
1105 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
1106 ByteBuf initialData = null;
1107 if (IoUring.isTcpFastOpenClientSideAvailable() &&
1108 config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT) == Boolean.TRUE) {
1109 ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
1110 outbound.addFlush();
1111 Object curr;
1112 if ((curr = outbound.current()) instanceof ByteBuf) {
1113 initialData = (ByteBuf) curr;
1114 }
1115 }
1116 if (initialData != null) {
1117 msgHdrMemoryArray = new MsgHdrMemoryArray((short) 1);
1118 MsgHdrMemory hdr = msgHdrMemoryArray.hdr(0);
1119 hdr.set(socket, inetSocketAddress, IoUring.memoryAddress(initialData),
1120 initialData.readableBytes(), (short) 0);
1121
1122 int fd = fd().intValue();
1123 IoRegistration registration = registration();
1124 IoUringIoOps ops = IoUringIoOps.newSendmsg(fd, (byte) 0, Native.MSG_FASTOPEN,
1125 hdr.address(), hdr.idx());
1126 connectId = registration.submit(ops);
1127 if (connectId == 0) {
1128
1129 freeMsgHdrArray();
1130 }
1131 } else {
1132 submitConnect(inetSocketAddress);
1133 }
1134 } else if (remoteAddress instanceof DomainSocketAddress) {
1135 DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) remoteAddress;
1136 submitConnect(unixDomainSocketAddress);
1137 } else {
1138 throw new Error("Unexpected SocketAddress implementation " + className(remoteAddress));
1139 }
1140
1141 if (connectId != 0) {
1142 ioState |= CONNECT_SCHEDULED;
1143 }
1144 } catch (Throwable t) {
1145 closeIfClosed();
1146 promise.tryFailure(annotateConnectException(t, remoteAddress));
1147 return;
1148 }
1149 connectPromise = promise;
1150 requestedRemoteAddress = remoteAddress;
1151
1152 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1153 if (connectTimeoutMillis > 0) {
1154 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
1155 @Override
1156 public void run() {
1157 ChannelPromise connectPromise = AbstractIoUringChannel.this.connectPromise;
1158 if (connectPromise != null && !connectPromise.isDone() &&
1159 connectPromise.tryFailure(new ConnectTimeoutException(
1160 "connection timed out: " + remoteAddress))) {
1161 close(voidPromise());
1162 }
1163 }
1164 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1165 }
1166
1167 promise.addListener(new ChannelFutureListener() {
1168 @Override
1169 public void operationComplete(ChannelFuture future) {
1170
1171
1172 if (future.isCancelled()) {
1173 cancelConnectTimeoutFuture();
1174 connectPromise = null;
1175 close(voidPromise());
1176 }
1177 }
1178 });
1179 }
1180 }
1181
1182 private void submitConnect(InetSocketAddress inetSocketAddress) {
1183 cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
1184 remoteAddressMemory = cleanable.buffer();
1185
1186 SockaddrIn.set(socket.isIpv6(), remoteAddressMemory, inetSocketAddress);
1187
1188 int fd = fd().intValue();
1189 IoRegistration registration = registration();
1190 IoUringIoOps ops = IoUringIoOps.newConnect(
1191 fd, (byte) 0, Buffer.memoryAddress(remoteAddressMemory), nextOpsId());
1192 connectId = registration.submit(ops);
1193 if (connectId == 0) {
1194
1195 freeRemoteAddressMemory();
1196 }
1197 }
1198
1199 private void submitConnect(DomainSocketAddress unixDomainSocketAddress) {
1200 cleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_UN);
1201 remoteAddressMemory = cleanable.buffer();
1202 SockaddrIn.setUds(remoteAddressMemory, unixDomainSocketAddress);
1203 int fd = fd().intValue();
1204 IoRegistration registration = registration();
1205 long addr = Buffer.memoryAddress(remoteAddressMemory);
1206 IoUringIoOps ops = IoUringIoOps.newConnect(fd, (byte) 0, addr, Native.SIZEOF_SOCKADDR_UN, nextOpsId());
1207 connectId = registration.submit(ops);
1208 if (connectId == 0) {
1209
1210 freeRemoteAddressMemory();
1211 }
1212 }
1213
1214 @Override
1215 protected Object filterOutboundMessage(Object msg) {
1216 if (msg instanceof ByteBuf) {
1217 ByteBuf buf = (ByteBuf) msg;
1218 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
1219 }
1220 throw new UnsupportedOperationException("unsupported message type");
1221 }
1222
1223 @Override
1224 protected void doRegister(ChannelPromise promise) {
1225 IoEventLoop eventLoop = (IoEventLoop) eventLoop();
1226 eventLoop.register(ioUringUnsafe()).addListener(f -> {
1227 if (f.isSuccess()) {
1228 registration = (IoRegistration) f.getNow();
1229 promise.setSuccess();
1230 } else {
1231 promise.setFailure(f.cause());
1232 }
1233 });
1234 }
1235
1236 @Override
1237 protected final void doDeregister() {
1238
1239 ioUringUnsafe().cancelOps(connectPromise != null);
1240 }
1241
1242 @Override
1243 protected void doBind(final SocketAddress local) throws Exception {
1244 if (local instanceof InetSocketAddress) {
1245 checkResolvable((InetSocketAddress) local);
1246 }
1247 socket.bind(local);
1248 this.local = socket.localAddress();
1249 }
1250
1251 protected static void checkResolvable(InetSocketAddress addr) {
1252 if (addr.isUnresolved()) {
1253 throw new UnresolvedAddressException();
1254 }
1255 }
1256
1257 @Override
1258 protected final SocketAddress localAddress0() {
1259 return local;
1260 }
1261
1262 @Override
1263 protected final SocketAddress remoteAddress0() {
1264 return remote;
1265 }
1266
1267 private static boolean isAllowHalfClosure(ChannelConfig config) {
1268 return config instanceof SocketChannelConfig &&
1269 ((SocketChannelConfig) config).isAllowHalfClosure();
1270 }
1271
1272 private void cancelConnectTimeoutFuture() {
1273 if (connectTimeoutFuture != null) {
1274 connectTimeoutFuture.cancel(false);
1275 connectTimeoutFuture = null;
1276 }
1277 }
1278
1279 private void computeRemote() {
1280 if (requestedRemoteAddress instanceof InetSocketAddress) {
1281 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
1282 }
1283 }
1284
1285 private boolean shouldBreakIoUringInReady(ChannelConfig config) {
1286 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
1287 }
1288
1289
1290
1291
1292
1293
1294
1295 protected abstract boolean socketIsEmpty(int flags);
1296
1297 abstract boolean isPollInFirst();
1298 }