1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.quic;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.AbstractChannel;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandler;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.ChannelMetadata;
27 import io.netty.channel.ChannelOption;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPipeline;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.DefaultChannelPipeline;
33 import io.netty.channel.EventLoop;
34 import io.netty.channel.RecvByteBufAllocator;
35 import io.netty.channel.socket.DatagramPacket;
36 import io.netty.handler.ssl.SniCompletionEvent;
37 import io.netty.handler.ssl.SslHandshakeCompletionEvent;
38 import io.netty.util.AttributeKey;
39 import io.netty.util.collection.LongObjectHashMap;
40 import io.netty.util.collection.LongObjectMap;
41 import io.netty.util.concurrent.Future;
42 import io.netty.util.concurrent.ImmediateEventExecutor;
43 import io.netty.util.concurrent.ImmediateExecutor;
44 import io.netty.util.concurrent.Promise;
45 import io.netty.util.internal.StringUtil;
46 import io.netty.util.internal.logging.InternalLogger;
47 import io.netty.util.internal.logging.InternalLoggerFactory;
48 import org.jetbrains.annotations.Nullable;
49
50 import javax.net.ssl.SSLEngine;
51 import javax.net.ssl.SSLHandshakeException;
52 import java.io.File;
53 import java.net.ConnectException;
54 import java.net.InetSocketAddress;
55 import java.net.SocketAddress;
56 import java.nio.BufferUnderflowException;
57 import java.nio.ByteBuffer;
58 import java.nio.channels.AlreadyConnectedException;
59 import java.nio.channels.ClosedChannelException;
60 import java.nio.channels.ConnectionPendingException;
61 import java.util.ArrayList;
62 import java.util.Collections;
63 import java.util.HashSet;
64 import java.util.List;
65 import java.util.Map;
66 import java.util.Set;
67 import java.util.concurrent.Executor;
68 import java.util.concurrent.ScheduledFuture;
69 import java.util.concurrent.TimeUnit;
70 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
71 import java.util.function.Consumer;
72 import java.util.function.Function;
73
74
75
76
77 final class QuicheQuicChannel extends AbstractChannel implements QuicChannel {
78 private static final InternalLogger logger = InternalLoggerFactory.getInstance(QuicheQuicChannel.class);
79 private static final String QLOG_FILE_EXTENSION = ".qlog";
80
81 enum StreamRecvResult {
82
83
84
85 DONE,
86
87
88
89 FIN,
90
91
92
93 OK
94 }
95
96 private enum ChannelState {
97 OPEN,
98 ACTIVE,
99 CLOSED
100 }
101
102 private enum SendResult {
103 SOME,
104 NONE,
105 CLOSE
106 }
107
108 private static final class CloseData implements ChannelFutureListener {
109 final boolean applicationClose;
110 final int err;
111 final ByteBuf reason;
112
113 CloseData(boolean applicationClose, int err, ByteBuf reason) {
114 this.applicationClose = applicationClose;
115 this.err = err;
116 this.reason = reason;
117 }
118
119 @Override
120 public void operationComplete(ChannelFuture future) {
121 reason.release();
122 }
123 }
124
125 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
126 private long[] readableStreams = new long[4];
127 private long[] writableStreams = new long[4];
128 private final LongObjectMap<QuicheQuicStreamChannel> streams = new LongObjectHashMap<>();
129 private final QuicheQuicChannelConfig config;
130 private final boolean server;
131 private final QuicStreamIdGenerator idGenerator;
132 private final ChannelHandler streamHandler;
133 private final Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray;
134 private final Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray;
135 private final TimeoutHandler timeoutHandler;
136 private final QuicConnectionIdGenerator connectionIdAddressGenerator;
137 private final QuicResetTokenGenerator resetTokenGenerator;
138 private final Set<ByteBuffer> sourceConnectionIds = new HashSet<>();
139
140 private Consumer<QuicheQuicChannel> freeTask;
141 private Executor sslTaskExecutor;
142 private boolean inFireChannelReadCompleteQueue;
143 private boolean fireChannelReadCompletePending;
144 private ByteBuf finBuffer;
145 private ChannelPromise connectPromise;
146 private ScheduledFuture<?> connectTimeoutFuture;
147 private QuicConnectionAddress connectAddress;
148 private CloseData closeData;
149 private QuicConnectionCloseEvent connectionCloseEvent;
150 private QuicConnectionStats statsAtClose;
151 private boolean supportsDatagram;
152 private boolean recvDatagramPending;
153 private boolean datagramReadable;
154 private boolean recvStreamPending;
155 private boolean streamReadable;
156 private boolean handshakeCompletionNotified;
157 private boolean earlyDataReadyNotified;
158 private int reantranceGuard;
159 private static final int IN_RECV = 1 << 1;
160 private static final int IN_CONNECTION_SEND = 1 << 2;
161 private static final int IN_HANDLE_WRITABLE_STREAMS = 1 << 3;
162 private volatile ChannelState state = ChannelState.OPEN;
163 private volatile boolean timedOut;
164 private volatile String traceId;
165 private volatile QuicheQuicConnection connection;
166 private volatile InetSocketAddress local;
167 private volatile InetSocketAddress remote;
168
169 private final ChannelFutureListener continueSendingListener = f -> {
170 if (connectionSend(connection) != SendResult.NONE) {
171 flushParent();
172 }
173 };
174
175 private static final AtomicLongFieldUpdater<QuicheQuicChannel> UNI_STREAMS_LEFT_UPDATER =
176 AtomicLongFieldUpdater.newUpdater(QuicheQuicChannel.class, "uniStreamsLeft");
177 private volatile long uniStreamsLeft;
178
179 private static final AtomicLongFieldUpdater<QuicheQuicChannel> BIDI_STREAMS_LEFT_UPDATER =
180 AtomicLongFieldUpdater.newUpdater(QuicheQuicChannel.class, "bidiStreamsLeft");
181 private volatile long bidiStreamsLeft;
182
183 private QuicheQuicChannel(Channel parent, boolean server, @Nullable ByteBuffer key, InetSocketAddress local,
184 InetSocketAddress remote, boolean supportsDatagram, ChannelHandler streamHandler,
185 Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
186 Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray,
187 @Nullable Consumer<QuicheQuicChannel> freeTask,
188 @Nullable Executor sslTaskExecutor,
189 @Nullable QuicConnectionIdGenerator connectionIdAddressGenerator,
190 @Nullable QuicResetTokenGenerator resetTokenGenerator) {
191 super(parent);
192 config = new QuicheQuicChannelConfig(this);
193 this.freeTask = freeTask;
194 this.server = server;
195 this.idGenerator = new QuicStreamIdGenerator(server);
196 this.connectionIdAddressGenerator = connectionIdAddressGenerator;
197 this.resetTokenGenerator = resetTokenGenerator;
198 if (key != null) {
199 this.sourceConnectionIds.add(key);
200 }
201
202 this.supportsDatagram = supportsDatagram;
203 this.local = local;
204 this.remote = remote;
205
206 this.streamHandler = streamHandler;
207 this.streamOptionsArray = streamOptionsArray;
208 this.streamAttrsArray = streamAttrsArray;
209 timeoutHandler = new TimeoutHandler();
210 this.sslTaskExecutor = sslTaskExecutor == null ? ImmediateExecutor.INSTANCE : sslTaskExecutor;
211 }
212
213 static QuicheQuicChannel forClient(Channel parent, InetSocketAddress local, InetSocketAddress remote,
214 ChannelHandler streamHandler,
215 Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
216 Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray) {
217 return new QuicheQuicChannel(parent, false, null, local, remote, false, streamHandler,
218 streamOptionsArray, streamAttrsArray, null, null, null, null);
219 }
220
221 static QuicheQuicChannel forServer(Channel parent, ByteBuffer key, InetSocketAddress local,
222 InetSocketAddress remote,
223 boolean supportsDatagram, ChannelHandler streamHandler,
224 Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
225 Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray,
226 Consumer<QuicheQuicChannel> freeTask, Executor sslTaskExecutor,
227 QuicConnectionIdGenerator connectionIdAddressGenerator,
228 QuicResetTokenGenerator resetTokenGenerator) {
229 return new QuicheQuicChannel(parent, true, key, local, remote, supportsDatagram,
230 streamHandler, streamOptionsArray, streamAttrsArray, freeTask,
231 sslTaskExecutor, connectionIdAddressGenerator, resetTokenGenerator);
232 }
233
234 private static final int MAX_ARRAY_LEN = 128;
235
236 private static long[] growIfNeeded(long[] array, int maxLength) {
237 if (maxLength > array.length) {
238 if (array.length == MAX_ARRAY_LEN) {
239 return array;
240 }
241
242 return new long[Math.min(MAX_ARRAY_LEN, array.length + 4)];
243 }
244 return array;
245 }
246
247 @Override
248 public boolean isTimedOut() {
249 return timedOut;
250 }
251
252 @Override
253 public SSLEngine sslEngine() {
254 QuicheQuicConnection connection = this.connection;
255 return connection == null ? null : connection.engine();
256 }
257
258 private void notifyAboutHandshakeCompletionIfNeeded(QuicheQuicConnection conn,
259 @Nullable SSLHandshakeException cause) {
260 if (handshakeCompletionNotified) {
261 return;
262 }
263 if (cause != null) {
264 pipeline().fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
265 return;
266 }
267 if (conn.isFreed()) {
268 return;
269 }
270 switch (connection.engine().getHandshakeStatus()) {
271 case NOT_HANDSHAKING:
272 case FINISHED:
273 handshakeCompletionNotified = true;
274 pipeline().fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
275 break;
276 default:
277 break;
278 }
279 }
280
281 @Override
282 public long peerAllowedStreams(QuicStreamType type) {
283 switch (type) {
284 case BIDIRECTIONAL:
285 return bidiStreamsLeft;
286 case UNIDIRECTIONAL:
287 return uniStreamsLeft;
288 default:
289 return 0;
290 }
291 }
292
293 void attachQuicheConnection(QuicheQuicConnection connection) {
294 this.connection = connection;
295
296 byte[] traceId = Quiche.quiche_conn_trace_id(connection.address());
297 if (traceId != null) {
298 this.traceId = new String(traceId);
299 }
300
301 connection.init(local, remote,
302 sniHostname -> pipeline().fireUserEventTriggered(new SniCompletionEvent(sniHostname)));
303
304
305 QLogConfiguration configuration = config.getQLogConfiguration();
306 if (configuration != null) {
307 final String fileName;
308 File file = new File(configuration.path());
309 if (file.isDirectory()) {
310
311 file.mkdir();
312 if (this.traceId != null) {
313 fileName = configuration.path() + File.separatorChar + this.traceId + "-" +
314 id().asShortText() + QLOG_FILE_EXTENSION;
315 } else {
316 fileName = configuration.path() + File.separatorChar + id().asShortText() + QLOG_FILE_EXTENSION;
317 }
318 } else {
319 fileName = configuration.path();
320 }
321
322 if (!Quiche.quiche_conn_set_qlog_path(connection.address(), fileName,
323 configuration.logTitle(), configuration.logDescription())) {
324 logger.info("Unable to create qlog file: {} ", fileName);
325 }
326 }
327 }
328
329 void connectNow(Function<QuicChannel, ? extends QuicSslEngine> engineProvider, Executor sslTaskExecutor,
330 Consumer<QuicheQuicChannel> freeTask, long configAddr, int localConnIdLength,
331 boolean supportsDatagram, ByteBuffer fromSockaddrMemory, ByteBuffer toSockaddrMemory)
332 throws Exception {
333 assert this.connection == null;
334 assert this.traceId == null;
335 assert this.sourceConnectionIds.isEmpty();
336
337 this.sslTaskExecutor = sslTaskExecutor;
338 this.freeTask = freeTask;
339
340 QuicConnectionAddress address = this.connectAddress;
341
342 if (address == QuicConnectionAddress.EPHEMERAL) {
343 address = QuicConnectionAddress.random(localConnIdLength);
344 }
345 ByteBuffer connectId = address.id();
346 if (connectId.remaining() != localConnIdLength) {
347 failConnectPromiseAndThrow(new IllegalArgumentException("connectionAddress has length "
348 + connectId.remaining()
349 + " instead of " + localConnIdLength));
350 }
351 QuicSslEngine engine = engineProvider.apply(this);
352 if (!(engine instanceof QuicheQuicSslEngine)) {
353 failConnectPromiseAndThrow(new IllegalArgumentException("QuicSslEngine is not of type "
354 + QuicheQuicSslEngine.class.getSimpleName()));
355 return;
356 }
357 if (!engine.getUseClientMode()) {
358 failConnectPromiseAndThrow(new IllegalArgumentException("QuicSslEngine is not create in client mode"));
359 }
360 QuicheQuicSslEngine quicheEngine = (QuicheQuicSslEngine) engine;
361 ByteBuf idBuffer = alloc().directBuffer(connectId.remaining()).writeBytes(connectId.duplicate());
362 try {
363 int fromSockaddrLen = SockaddrIn.setAddress(fromSockaddrMemory, local);
364 int toSockaddrLen = SockaddrIn.setAddress(toSockaddrMemory, remote);
365 QuicheQuicConnection connection = quicheEngine.createConnection(ssl ->
366 Quiche.quiche_conn_new_with_tls(Quiche.readerMemoryAddress(idBuffer),
367 idBuffer.readableBytes(), -1, -1,
368 Quiche.memoryAddressWithPosition(fromSockaddrMemory), fromSockaddrLen,
369 Quiche.memoryAddressWithPosition(toSockaddrMemory), toSockaddrLen,
370 configAddr, ssl, false));
371 if (connection == null) {
372 failConnectPromiseAndThrow(new ConnectException());
373 return;
374 }
375 attachQuicheConnection(connection);
376 QuicClientSessionCache sessionCache = quicheEngine.ctx.getSessionCache();
377 if (sessionCache != null) {
378 byte[] sessionBytes = sessionCache
379 .getSession(quicheEngine.getSession().getPeerHost(), quicheEngine.getSession().getPeerPort());
380 if (sessionBytes != null) {
381 Quiche.quiche_conn_set_session(connection.address(), sessionBytes);
382 }
383 }
384 this.supportsDatagram = supportsDatagram;
385 sourceConnectionIds.add(connectId);
386 } finally {
387 idBuffer.release();
388 }
389 }
390
391 private void failConnectPromiseAndThrow(Exception e) throws Exception {
392 tryFailConnectPromise(e);
393 throw e;
394 }
395
396 private boolean tryFailConnectPromise(Exception e) {
397 ChannelPromise promise = connectPromise;
398 if (promise != null) {
399 connectPromise = null;
400 promise.tryFailure(e);
401 return true;
402 }
403 return false;
404 }
405
406 Set<ByteBuffer> sourceConnectionIds() {
407 return sourceConnectionIds;
408 }
409
410 boolean markInFireChannelReadCompleteQueue() {
411 if (inFireChannelReadCompleteQueue) {
412 return false;
413 }
414 inFireChannelReadCompleteQueue = true;
415 return true;
416 }
417
418 private void failPendingConnectPromise() {
419 ChannelPromise promise = QuicheQuicChannel.this.connectPromise;
420 if (promise != null) {
421 QuicheQuicChannel.this.connectPromise = null;
422 promise.tryFailure(new QuicClosedChannelException(this.connectionCloseEvent));
423 }
424 }
425
426 void forceClose() {
427 unsafe().close(voidPromise());
428 }
429
430 @Override
431 protected DefaultChannelPipeline newChannelPipeline() {
432 return new DefaultChannelPipeline(this) {
433 @Override
434 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
435 if (msg instanceof QuicStreamChannel) {
436 QuicStreamChannel channel = (QuicStreamChannel) msg;
437 Quic.setupChannel(channel, streamOptionsArray, streamAttrsArray, streamHandler, logger);
438 ctx.channel().eventLoop().register(channel);
439 } else {
440 super.onUnhandledInboundMessage(ctx, msg);
441 }
442 }
443 };
444 }
445
446 @Override
447 public QuicChannel flush() {
448 super.flush();
449 return this;
450 }
451
452 @Override
453 public QuicChannel read() {
454 super.read();
455 return this;
456 }
457
458 @Override
459 public Future<QuicStreamChannel> createStream(QuicStreamType type, @Nullable ChannelHandler handler,
460 Promise<QuicStreamChannel> promise) {
461 if (eventLoop().inEventLoop()) {
462 ((QuicChannelUnsafe) unsafe()).connectStream(type, handler, promise);
463 } else {
464 eventLoop().execute(() -> ((QuicChannelUnsafe) unsafe()).connectStream(type, handler, promise));
465 }
466 return promise;
467 }
468
469 @Override
470 public ChannelFuture close(boolean applicationClose, int error, ByteBuf reason, ChannelPromise promise) {
471 if (eventLoop().inEventLoop()) {
472 close0(applicationClose, error, reason, promise);
473 } else {
474 eventLoop().execute(() -> close0(applicationClose, error, reason, promise));
475 }
476 return promise;
477 }
478
479 private void close0(boolean applicationClose, int error, ByteBuf reason, ChannelPromise promise) {
480 if (closeData == null) {
481 if (!reason.hasMemoryAddress()) {
482
483 ByteBuf copy = alloc().directBuffer(reason.readableBytes()).writeBytes(reason);
484 reason.release();
485 reason = copy;
486 }
487 closeData = new CloseData(applicationClose, error, reason);
488 promise.addListener(closeData);
489 } else {
490
491 reason.release();
492 }
493 close(promise);
494 }
495
496 @Override
497 public String toString() {
498 String traceId = this.traceId;
499 if (traceId == null) {
500 return "()" + super.toString();
501 } else {
502 return '(' + traceId + ')' + super.toString();
503 }
504 }
505
506 @Override
507 protected AbstractUnsafe newUnsafe() {
508 return new QuicChannelUnsafe();
509 }
510
511 @Override
512 protected boolean isCompatible(EventLoop eventLoop) {
513 return parent().eventLoop() == eventLoop;
514 }
515
516 @Override
517 @Nullable
518 protected QuicConnectionAddress localAddress0() {
519 QuicheQuicConnection connection = this.connection;
520 return connection == null ? null : connection.sourceId();
521 }
522
523 @Override
524 @Nullable
525 protected QuicConnectionAddress remoteAddress0() {
526 QuicheQuicConnection connection = this.connection;
527 return connection == null ? null : connection.destinationId();
528 }
529
530 @Override
531 @Nullable
532 public QuicConnectionAddress localAddress() {
533
534 return localAddress0();
535 }
536
537 @Override
538 @Nullable
539 public QuicConnectionAddress remoteAddress() {
540
541 return remoteAddress0();
542 }
543
544 @Override
545 @Nullable
546 public SocketAddress localSocketAddress() {
547 return local;
548 }
549
550 @Override
551 @Nullable
552 public SocketAddress remoteSocketAddress() {
553 return remote;
554 }
555
556 @Override
557 protected void doBind(SocketAddress socketAddress) {
558 throw new UnsupportedOperationException();
559 }
560
561 @Override
562 protected void doDisconnect() throws Exception {
563 doClose();
564 }
565
566 @Override
567 protected void doClose() throws Exception {
568 if (state == ChannelState.CLOSED) {
569 return;
570 }
571 state = ChannelState.CLOSED;
572
573 QuicheQuicConnection conn = this.connection;
574 if (conn == null || conn.isFreed()) {
575 if (closeData != null) {
576 closeData.reason.release();
577 closeData = null;
578 }
579 failPendingConnectPromise();
580 return;
581 }
582
583
584 SendResult sendResult = connectionSend(conn);
585
586 final boolean app;
587 final int err;
588 final ByteBuf reason;
589 if (closeData == null) {
590 app = false;
591 err = 0;
592 reason = Unpooled.EMPTY_BUFFER;
593 } else {
594 app = closeData.applicationClose;
595 err = closeData.err;
596 reason = closeData.reason;
597 closeData = null;
598 }
599
600 failPendingConnectPromise();
601 try {
602 int res = Quiche.quiche_conn_close(conn.address(), app, err,
603 Quiche.readerMemoryAddress(reason), reason.readableBytes());
604 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
605 throw Quiche.convertToException(res);
606 }
607
608
609
610
611 if (connectionSend(conn) == SendResult.SOME) {
612 sendResult = SendResult.SOME;
613 }
614 } finally {
615
616
617
618 statsAtClose = collectStats0(conn, eventLoop().newPromise());
619 try {
620 timedOut = Quiche.quiche_conn_is_timed_out(conn.address());
621
622 closeStreams();
623 if (finBuffer != null) {
624 finBuffer.release();
625 finBuffer = null;
626 }
627 } finally {
628 if (sendResult == SendResult.SOME) {
629
630 forceFlushParent();
631 } else {
632 flushParent();
633 }
634 conn.free();
635 if (freeTask != null) {
636 freeTask.accept(this);
637 }
638 timeoutHandler.cancel();
639
640 local = null;
641 remote = null;
642 }
643 }
644 }
645
646 @Override
647 protected void doBeginRead() {
648 recvDatagramPending = true;
649 recvStreamPending = true;
650 if (datagramReadable || streamReadable) {
651 ((QuicChannelUnsafe) unsafe()).recv();
652 }
653 }
654
655 @Override
656 protected Object filterOutboundMessage(Object msg) {
657 if (msg instanceof ByteBuf) {
658 return msg;
659 }
660 throw new UnsupportedOperationException("Unsupported message type: " + StringUtil.simpleClassName(msg));
661 }
662
663 @Override
664 protected void doWrite(ChannelOutboundBuffer channelOutboundBuffer) throws Exception {
665 if (!supportsDatagram) {
666 throw new UnsupportedOperationException("Datagram extension is not supported");
667 }
668 boolean sendSomething = false;
669 boolean retry = false;
670 QuicheQuicConnection conn = connection;
671 try {
672 for (;;) {
673 ByteBuf buffer = (ByteBuf) channelOutboundBuffer.current();
674 if (buffer == null) {
675 break;
676 }
677
678 int readable = buffer.readableBytes();
679 if (readable == 0) {
680
681 channelOutboundBuffer.remove();
682 continue;
683 }
684
685 final int res;
686 if (!buffer.isDirect() || buffer.nioBufferCount() > 1) {
687 ByteBuf tmpBuffer = alloc().directBuffer(readable);
688 try {
689 tmpBuffer.writeBytes(buffer, buffer.readerIndex(), readable);
690 res = sendDatagram(conn, tmpBuffer);
691 } finally {
692 tmpBuffer.release();
693 }
694 } else {
695 res = sendDatagram(conn, buffer);
696 }
697 if (res >= 0) {
698 channelOutboundBuffer.remove();
699 sendSomething = true;
700 retry = false;
701 } else {
702 if (res == Quiche.QUICHE_ERR_BUFFER_TOO_SHORT) {
703 retry = false;
704 channelOutboundBuffer.remove(new BufferUnderflowException());
705 } else if (res == Quiche.QUICHE_ERR_INVALID_STATE) {
706 throw new UnsupportedOperationException("Remote peer does not support Datagram extension");
707 } else if (res == Quiche.QUICHE_ERR_DONE) {
708 if (retry) {
709
710 for (;;) {
711 if (!channelOutboundBuffer.remove()) {
712
713 return;
714 }
715 }
716 }
717
718 sendSomething = false;
719
720
721 if (connectionSend(conn) != SendResult.NONE) {
722 forceFlushParent();
723 }
724
725 retry = true;
726 } else {
727 throw Quiche.convertToException(res);
728 }
729 }
730 }
731 } finally {
732 if (sendSomething && connectionSend(conn) != SendResult.NONE) {
733 flushParent();
734 }
735 }
736 }
737
738 private static int sendDatagram(QuicheQuicConnection conn, ByteBuf buf) throws ClosedChannelException {
739 return Quiche.quiche_conn_dgram_send(connectionAddressChecked(conn),
740 Quiche.readerMemoryAddress(buf), buf.readableBytes());
741 }
742
743 @Override
744 public QuicChannelConfig config() {
745 return config;
746 }
747
748 @Override
749 public boolean isOpen() {
750 return state != ChannelState.CLOSED;
751 }
752
753 @Override
754 public boolean isActive() {
755 return state == ChannelState.ACTIVE;
756 }
757
758 @Override
759 public ChannelMetadata metadata() {
760 return METADATA;
761 }
762
763
764
765
766
767 private void flushParent() {
768 if (!inFireChannelReadCompleteQueue) {
769 forceFlushParent();
770 }
771 }
772
773
774
775
776 private void forceFlushParent() {
777 parent().flush();
778 }
779
780 private static long connectionAddressChecked(@Nullable QuicheQuicConnection conn) throws ClosedChannelException {
781 if (conn == null || conn.isFreed()) {
782 throw new ClosedChannelException();
783 }
784 return conn.address();
785 }
786
787 boolean freeIfClosed() {
788 QuicheQuicConnection conn = connection;
789 if (conn == null || conn.isFreed()) {
790 return true;
791 }
792 if (conn.isClosed()) {
793 unsafe().close(newPromise());
794 return true;
795 }
796 return false;
797 }
798
799 private void closeStreams() {
800 if (streams.isEmpty()) {
801 return;
802 }
803 final ClosedChannelException closedChannelException;
804 if (isTimedOut()) {
805
806 closedChannelException = new QuicTimeoutClosedChannelException();
807 } else {
808 closedChannelException = new ClosedChannelException();
809 }
810
811
812 for (QuicheQuicStreamChannel stream: streams.values().toArray(new QuicheQuicStreamChannel[0])) {
813 stream.unsafe().close(closedChannelException, voidPromise());
814 }
815 streams.clear();
816 }
817
818 void streamPriority(long streamId, byte priority, boolean incremental) throws Exception {
819 int res = Quiche.quiche_conn_stream_priority(connectionAddressChecked(connection), streamId,
820 priority, incremental);
821 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
822 throw Quiche.convertToException(res);
823 }
824 }
825
826 void streamClosed(long streamId) {
827 streams.remove(streamId);
828 }
829
830 boolean isStreamLocalCreated(long streamId) {
831 return (streamId & 0x1) == (server ? 1 : 0);
832 }
833
834 QuicStreamType streamType(long streamId) {
835 return (streamId & 0x2) == 0 ? QuicStreamType.BIDIRECTIONAL : QuicStreamType.UNIDIRECTIONAL;
836 }
837
838 void streamShutdown(long streamId, boolean read, boolean write, int err, ChannelPromise promise) {
839 QuicheQuicConnection conn = this.connection;
840 final long connectionAddress;
841 try {
842 connectionAddress = connectionAddressChecked(conn);
843 } catch (ClosedChannelException e) {
844 promise.setFailure(e);
845 return;
846 }
847 int res = 0;
848 if (read) {
849 res |= Quiche.quiche_conn_stream_shutdown(connectionAddress, streamId, Quiche.QUICHE_SHUTDOWN_READ, err);
850 }
851 if (write) {
852 res |= Quiche.quiche_conn_stream_shutdown(connectionAddress, streamId, Quiche.QUICHE_SHUTDOWN_WRITE, err);
853 }
854
855
856
857
858
859 if (connectionSend(conn) != SendResult.NONE) {
860
861 forceFlushParent();
862 }
863 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
864 promise.setFailure(Quiche.convertToException(res));
865 } else {
866 promise.setSuccess();
867 }
868 }
869
870 void streamSendFin(long streamId) throws Exception {
871 QuicheQuicConnection conn = connection;
872 try {
873
874 int res = streamSend0(conn, streamId, Unpooled.EMPTY_BUFFER, true);
875 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
876 throw Quiche.convertToException(res);
877 }
878 } finally {
879
880
881
882
883 if (connectionSend(conn) != SendResult.NONE) {
884 flushParent();
885 }
886 }
887 }
888
889 int streamSend(long streamId, ByteBuf buffer, boolean fin) throws ClosedChannelException {
890 QuicheQuicConnection conn = connection;
891 if (buffer.nioBufferCount() == 1) {
892 return streamSend0(conn, streamId, buffer, fin);
893 }
894 ByteBuffer[] nioBuffers = buffer.nioBuffers();
895 int lastIdx = nioBuffers.length - 1;
896 int res = 0;
897 for (int i = 0; i < lastIdx; i++) {
898 ByteBuffer nioBuffer = nioBuffers[i];
899 while (nioBuffer.hasRemaining()) {
900 int localRes = streamSend(conn, streamId, nioBuffer, false);
901 if (localRes <= 0) {
902 return res;
903 }
904 res += localRes;
905
906 nioBuffer.position(nioBuffer.position() + localRes);
907 }
908 }
909 int localRes = streamSend(conn, streamId, nioBuffers[lastIdx], fin);
910 if (localRes > 0) {
911 res += localRes;
912 }
913 return res;
914 }
915
916 void connectionSendAndFlush() {
917 if (inFireChannelReadCompleteQueue || (reantranceGuard & IN_HANDLE_WRITABLE_STREAMS) != 0) {
918 return;
919 }
920 if (connectionSend(connection) != SendResult.NONE) {
921 flushParent();
922 }
923 }
924
925 private int streamSend0(QuicheQuicConnection conn, long streamId, ByteBuf buffer, boolean fin)
926 throws ClosedChannelException {
927 return Quiche.quiche_conn_stream_send(connectionAddressChecked(conn), streamId,
928 Quiche.readerMemoryAddress(buffer), buffer.readableBytes(), fin);
929 }
930
931 private int streamSend(QuicheQuicConnection conn, long streamId, ByteBuffer buffer, boolean fin)
932 throws ClosedChannelException {
933 return Quiche.quiche_conn_stream_send(connectionAddressChecked(conn), streamId,
934 Quiche.memoryAddressWithPosition(buffer), buffer.remaining(), fin);
935 }
936
937 StreamRecvResult streamRecv(long streamId, ByteBuf buffer) throws Exception {
938 QuicheQuicConnection conn = connection;
939 long connAddr = connectionAddressChecked(conn);
940 if (finBuffer == null) {
941 finBuffer = alloc().directBuffer(1);
942 }
943 int writerIndex = buffer.writerIndex();
944 int recvLen = Quiche.quiche_conn_stream_recv(connAddr, streamId,
945 Quiche.writerMemoryAddress(buffer), buffer.writableBytes(), Quiche.writerMemoryAddress(finBuffer));
946 if (recvLen == Quiche.QUICHE_ERR_DONE) {
947 return StreamRecvResult.DONE;
948 } else if (recvLen < 0) {
949 throw Quiche.convertToException(recvLen);
950 }
951 buffer.writerIndex(writerIndex + recvLen);
952 return finBuffer.getBoolean(0) ? StreamRecvResult.FIN : StreamRecvResult.OK;
953 }
954
955
956
957
958 void recv(InetSocketAddress sender, InetSocketAddress recipient, ByteBuf buffer) {
959 ((QuicChannelUnsafe) unsafe()).connectionRecv(sender, recipient, buffer);
960 }
961
962
963
964
965
966
967 List<ByteBuffer> retiredSourceConnectionId() {
968 QuicheQuicConnection connection = this.connection;
969 if (connection == null || connection.isFreed()) {
970 return Collections.emptyList();
971 }
972 long connAddr = connection.address();
973 assert connAddr != -1;
974 List<ByteBuffer> retiredSourceIds = null;
975 for (;;) {
976 byte[] retired = Quiche.quiche_conn_retired_scid_next(connAddr);
977 if (retired == null) {
978 break;
979 }
980 if (retiredSourceIds == null) {
981 retiredSourceIds = new ArrayList<>();
982 }
983 ByteBuffer retiredId = ByteBuffer.wrap(retired);
984 retiredSourceIds.add(retiredId);
985 sourceConnectionIds.remove(retiredId);
986 }
987 if (retiredSourceIds == null) {
988 return Collections.emptyList();
989 }
990 return retiredSourceIds;
991 }
992
993 List<ByteBuffer> newSourceConnectionIds() {
994 if (connectionIdAddressGenerator != null && resetTokenGenerator != null) {
995 QuicheQuicConnection connection = this.connection;
996 if (connection == null || connection.isFreed()) {
997 return Collections.emptyList();
998 }
999 long connAddr = connection.address();
1000
1001
1002 int left = Quiche.quiche_conn_scids_left(connAddr);
1003 if (left > 0) {
1004 QuicConnectionAddress sourceAddr = connection.sourceId();
1005 if (sourceAddr == null) {
1006 return Collections.emptyList();
1007 }
1008 List<ByteBuffer> generatedIds = new ArrayList<>(left);
1009 boolean sendAndFlush = false;
1010 ByteBuffer key = sourceAddr.id();
1011 ByteBuf connIdBuffer = alloc().directBuffer(key.remaining());
1012
1013 byte[] resetTokenArray = new byte[Quic.RESET_TOKEN_LEN];
1014 try {
1015 do {
1016 ByteBuffer srcId = connectionIdAddressGenerator.newId(key.duplicate(), key.remaining())
1017 .asReadOnlyBuffer();
1018 connIdBuffer.clear();
1019 connIdBuffer.writeBytes(srcId.duplicate());
1020 ByteBuffer resetToken = resetTokenGenerator.newResetToken(srcId.duplicate());
1021 resetToken.get(resetTokenArray);
1022 long result = Quiche.quiche_conn_new_scid(
1023 connAddr, Quiche.memoryAddress(connIdBuffer, 0, connIdBuffer.readableBytes()),
1024 connIdBuffer.readableBytes(), resetTokenArray, false, -1);
1025 if (result < 0) {
1026 break;
1027 }
1028 sendAndFlush = true;
1029 generatedIds.add(srcId.duplicate());
1030 sourceConnectionIds.add(srcId);
1031 } while (--left > 0);
1032 } finally {
1033 connIdBuffer.release();
1034 }
1035
1036 if (sendAndFlush) {
1037 connectionSendAndFlush();
1038 }
1039 return generatedIds;
1040 }
1041 }
1042 return Collections.emptyList();
1043 }
1044
1045 void writable() {
1046 QuicheQuicConnection conn = connection;
1047 SendResult result = connectionSend(conn);
1048 handleWritableStreams(conn);
1049 if (connectionSend(conn) == SendResult.SOME) {
1050 result = SendResult.SOME;
1051 }
1052 if (result == SendResult.SOME) {
1053
1054 forceFlushParent();
1055 }
1056 freeIfClosed();
1057 }
1058
1059 int streamCapacity(long streamId) {
1060 QuicheQuicConnection conn = connection;
1061 if (conn.isClosed()) {
1062 return 0;
1063 }
1064 return Quiche.quiche_conn_stream_capacity(conn.address(), streamId);
1065 }
1066
1067 private boolean handleWritableStreams(QuicheQuicConnection conn) {
1068 if (conn.isFreed()) {
1069 return false;
1070 }
1071 reantranceGuard |= IN_HANDLE_WRITABLE_STREAMS;
1072 try {
1073 long connAddr = conn.address();
1074 boolean mayNeedWrite = false;
1075
1076 if (Quiche.quiche_conn_is_established(connAddr) ||
1077 Quiche.quiche_conn_is_in_early_data(connAddr)) {
1078 long writableIterator = Quiche.quiche_conn_writable(connAddr);
1079
1080 int totalWritable = 0;
1081 try {
1082
1083 for (;;) {
1084 int writable = Quiche.quiche_stream_iter_next(
1085 writableIterator, writableStreams);
1086 for (int i = 0; i < writable; i++) {
1087 long streamId = writableStreams[i];
1088 QuicheQuicStreamChannel streamChannel = streams.get(streamId);
1089 if (streamChannel != null) {
1090 int capacity = Quiche.quiche_conn_stream_capacity(connAddr, streamId);
1091 if (streamChannel.writable(capacity)) {
1092 mayNeedWrite = true;
1093 }
1094 }
1095 }
1096 if (writable > 0) {
1097 totalWritable += writable;
1098 }
1099 if (writable < writableStreams.length) {
1100
1101 break;
1102 }
1103 }
1104 } finally {
1105 Quiche.quiche_stream_iter_free(writableIterator);
1106 }
1107 writableStreams = growIfNeeded(writableStreams, totalWritable);
1108 }
1109 return mayNeedWrite;
1110 } finally {
1111 reantranceGuard &= ~IN_HANDLE_WRITABLE_STREAMS;
1112 }
1113 }
1114
1115
1116
1117
1118
1119
1120 void recvComplete() {
1121 try {
1122 QuicheQuicConnection conn = connection;
1123 if (conn.isFreed()) {
1124
1125 forceFlushParent();
1126 return;
1127 }
1128 fireChannelReadCompleteIfNeeded();
1129
1130
1131
1132 connectionSend(conn);
1133
1134
1135 forceFlushParent();
1136 freeIfClosed();
1137 } finally {
1138 inFireChannelReadCompleteQueue = false;
1139 }
1140 }
1141
1142 private void fireChannelReadCompleteIfNeeded() {
1143 if (fireChannelReadCompletePending) {
1144 fireChannelReadCompletePending = false;
1145 pipeline().fireChannelReadComplete();
1146 }
1147 }
1148
1149 private void fireExceptionEvents(QuicheQuicConnection conn, Throwable cause) {
1150 if (cause instanceof SSLHandshakeException) {
1151 notifyAboutHandshakeCompletionIfNeeded(conn, (SSLHandshakeException) cause);
1152 }
1153 pipeline().fireExceptionCaught(cause);
1154 }
1155
1156 private boolean runTasksDirectly() {
1157 return sslTaskExecutor == null || sslTaskExecutor == ImmediateExecutor.INSTANCE ||
1158 sslTaskExecutor == ImmediateEventExecutor.INSTANCE;
1159 }
1160
1161 private void runAllTaskSend(QuicheQuicConnection conn, Runnable task) {
1162 sslTaskExecutor.execute(decorateTaskSend(conn, task));
1163 }
1164
1165 private void runAll(QuicheQuicConnection conn, Runnable task) {
1166 do {
1167 task.run();
1168 } while ((task = conn.sslTask()) != null);
1169 }
1170
1171 private Runnable decorateTaskSend(QuicheQuicConnection conn, Runnable task) {
1172 return () -> {
1173 try {
1174 runAll(conn, task);
1175 } finally {
1176
1177 eventLoop().execute(() -> {
1178
1179 if (connectionSend(conn) != SendResult.NONE) {
1180 forceFlushParent();
1181 }
1182 freeIfClosed();
1183 });
1184 }
1185 };
1186 }
1187
1188 private SendResult connectionSendSegments(QuicheQuicConnection conn,
1189 SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator) {
1190 if (conn.isClosed()) {
1191 return SendResult.NONE;
1192 }
1193 List<ByteBuf> bufferList = new ArrayList<>(segmentedDatagramPacketAllocator.maxNumSegments());
1194 long connAddr = conn.address();
1195 int maxDatagramSize = Quiche.quiche_conn_max_send_udp_payload_size(connAddr);
1196 SendResult sendResult = SendResult.NONE;
1197 boolean close = false;
1198 try {
1199 for (;;) {
1200 int len = calculateSendBufferLength(connAddr, maxDatagramSize);
1201 ByteBuf out = alloc().directBuffer(len);
1202
1203 ByteBuffer sendInfo = conn.nextSendInfo();
1204 InetSocketAddress sendToAddress = this.remote;
1205
1206 int writerIndex = out.writerIndex();
1207 int written = Quiche.quiche_conn_send(
1208 connAddr, Quiche.writerMemoryAddress(out), out.writableBytes(),
1209 Quiche.memoryAddressWithPosition(sendInfo));
1210 if (written == 0) {
1211 out.release();
1212
1213 continue;
1214 }
1215 final boolean done;
1216 if (written < 0) {
1217 done = true;
1218 if (written != Quiche.QUICHE_ERR_DONE) {
1219 close = Quiche.shouldClose(written);
1220 Exception e = Quiche.convertToException(written);
1221 if (!tryFailConnectPromise(e)) {
1222
1223 fireExceptionEvents(conn, e);
1224 }
1225 }
1226 } else {
1227 done = false;
1228 }
1229 int size = bufferList.size();
1230 if (done) {
1231
1232 out.release();
1233
1234 switch (size) {
1235 case 0:
1236
1237 break;
1238 case 1:
1239
1240 parent().write(new DatagramPacket(bufferList.get(0), sendToAddress));
1241 sendResult = SendResult.SOME;
1242 break;
1243 default:
1244 int segmentSize = segmentSize(bufferList);
1245 ByteBuf compositeBuffer = Unpooled.wrappedBuffer(bufferList.toArray(new ByteBuf[0]));
1246
1247 parent().write(segmentedDatagramPacketAllocator.newPacket(
1248 compositeBuffer, segmentSize, sendToAddress));
1249 sendResult = SendResult.SOME;
1250 break;
1251 }
1252 bufferList.clear();
1253 if (close) {
1254 sendResult = SendResult.CLOSE;
1255 }
1256 return sendResult;
1257 }
1258 out.writerIndex(writerIndex + written);
1259
1260 int segmentSize = -1;
1261 if (conn.isSendInfoChanged()) {
1262
1263 remote = QuicheSendInfo.getToAddress(sendInfo);
1264 local = QuicheSendInfo.getFromAddress(sendInfo);
1265
1266 if (size > 0) {
1267
1268
1269 segmentSize = segmentSize(bufferList);
1270 }
1271 } else if (size > 0) {
1272 int lastReadable = segmentSize(bufferList);
1273
1274
1275 if (lastReadable != out.readableBytes() ||
1276 size == segmentedDatagramPacketAllocator.maxNumSegments()) {
1277 segmentSize = lastReadable;
1278 }
1279 }
1280
1281
1282 if (segmentSize != -1) {
1283 final boolean stop;
1284 if (size == 1) {
1285
1286 stop = writePacket(new DatagramPacket(
1287 bufferList.get(0), sendToAddress), maxDatagramSize, len);
1288 } else {
1289
1290 ByteBuf compositeBuffer = Unpooled.wrappedBuffer(bufferList.toArray(new ByteBuf[0]));
1291 stop = writePacket(segmentedDatagramPacketAllocator.newPacket(
1292 compositeBuffer, segmentSize, sendToAddress), maxDatagramSize, len);
1293 }
1294 bufferList.clear();
1295 sendResult = SendResult.SOME;
1296
1297 if (stop) {
1298
1299
1300
1301 if (out.isReadable()) {
1302 parent().write(new DatagramPacket(out, sendToAddress));
1303 } else {
1304 out.release();
1305 }
1306 if (close) {
1307 sendResult = SendResult.CLOSE;
1308 }
1309 return sendResult;
1310 }
1311 }
1312
1313
1314 out.touch(bufferList);
1315
1316 bufferList.add(out);
1317 }
1318 } finally {
1319
1320 }
1321 }
1322
1323 private static int segmentSize(List<ByteBuf> bufferList) {
1324 assert !bufferList.isEmpty();
1325 int size = bufferList.size();
1326 return bufferList.get(size - 1).readableBytes();
1327 }
1328
1329 private SendResult connectionSendSimple(QuicheQuicConnection conn) {
1330 if (conn.isClosed()) {
1331 return SendResult.NONE;
1332 }
1333 long connAddr = conn.address();
1334 SendResult sendResult = SendResult.NONE;
1335 boolean close = false;
1336 int maxDatagramSize = Quiche.quiche_conn_max_send_udp_payload_size(connAddr);
1337 for (;;) {
1338 ByteBuffer sendInfo = conn.nextSendInfo();
1339
1340 int len = calculateSendBufferLength(connAddr, maxDatagramSize);
1341 ByteBuf out = alloc().directBuffer(len);
1342 int writerIndex = out.writerIndex();
1343
1344 int written = Quiche.quiche_conn_send(
1345 connAddr, Quiche.writerMemoryAddress(out), out.writableBytes(),
1346 Quiche.memoryAddressWithPosition(sendInfo));
1347
1348 if (written == 0) {
1349
1350 out.release();
1351 continue;
1352 }
1353 if (written < 0) {
1354 out.release();
1355 if (written != Quiche.QUICHE_ERR_DONE) {
1356 close = Quiche.shouldClose(written);
1357
1358 Exception e = Quiche.convertToException(written);
1359 if (!tryFailConnectPromise(e)) {
1360 fireExceptionEvents(conn, e);
1361 }
1362 }
1363 break;
1364 }
1365 if (conn.isSendInfoChanged()) {
1366
1367 remote = QuicheSendInfo.getToAddress(sendInfo);
1368 local = QuicheSendInfo.getFromAddress(sendInfo);
1369 }
1370 out.writerIndex(writerIndex + written);
1371 boolean stop = writePacket(new DatagramPacket(out, remote), maxDatagramSize, len);
1372 sendResult = SendResult.SOME;
1373 if (stop) {
1374
1375 break;
1376 }
1377 }
1378 if (close) {
1379 sendResult = SendResult.CLOSE;
1380 }
1381 return sendResult;
1382 }
1383
1384 private boolean writePacket(DatagramPacket packet, int maxDatagramSize, int len) {
1385 ChannelFuture future = parent().write(packet);
1386 if (isSendWindowUsed(maxDatagramSize, len)) {
1387
1388 future.addListener(continueSendingListener);
1389 return true;
1390 }
1391 return false;
1392 }
1393
1394 private static boolean isSendWindowUsed(int maxDatagramSize, int len) {
1395 return len < maxDatagramSize;
1396 }
1397
1398 private static int calculateSendBufferLength(long connAddr, int maxDatagramSize) {
1399 int len = Math.min(maxDatagramSize, Quiche.quiche_conn_send_quantum(connAddr));
1400 if (len <= 0) {
1401
1402
1403
1404
1405 return 8;
1406 }
1407 return len;
1408 }
1409
1410
1411
1412
1413
1414 private SendResult connectionSend(QuicheQuicConnection conn) {
1415 if (conn.isFreed()) {
1416 return SendResult.NONE;
1417 }
1418 if ((reantranceGuard & IN_CONNECTION_SEND) != 0) {
1419
1420 notifyEarlyDataReadyIfNeeded(conn);
1421 return SendResult.NONE;
1422 }
1423
1424 reantranceGuard |= IN_CONNECTION_SEND;
1425 try {
1426 SendResult sendResult;
1427 SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator =
1428 config.getSegmentedDatagramPacketAllocator();
1429 if (segmentedDatagramPacketAllocator.maxNumSegments() > 0) {
1430 sendResult = connectionSendSegments(conn, segmentedDatagramPacketAllocator);
1431 } else {
1432 sendResult = connectionSendSimple(conn);
1433 }
1434
1435
1436
1437 Runnable task = conn.sslTask();
1438 if (task != null) {
1439 if (runTasksDirectly()) {
1440
1441 do {
1442 task.run();
1443
1444 notifyEarlyDataReadyIfNeeded(conn);
1445 } while ((task = conn.sslTask()) != null);
1446
1447
1448
1449 eventLoop().execute(new Runnable() {
1450 @Override
1451 public void run() {
1452
1453 if (connectionSend(conn) != SendResult.NONE) {
1454 forceFlushParent();
1455 }
1456 freeIfClosed();
1457 }
1458 });
1459 } else {
1460 runAllTaskSend(conn, task);
1461 }
1462 } else {
1463
1464 notifyEarlyDataReadyIfNeeded(conn);
1465 }
1466
1467
1468 timeoutHandler.scheduleTimeout();
1469 return sendResult;
1470 } finally {
1471 reantranceGuard &= ~IN_CONNECTION_SEND;
1472 }
1473 }
1474
1475 private final class QuicChannelUnsafe extends AbstractChannel.AbstractUnsafe {
1476
1477 void connectStream(QuicStreamType type, @Nullable ChannelHandler handler,
1478 Promise<QuicStreamChannel> promise) {
1479 if (!promise.setUncancellable()) {
1480 return;
1481 }
1482 long streamId = idGenerator.nextStreamId(type == QuicStreamType.BIDIRECTIONAL);
1483
1484 try {
1485 int res = streamSend0(connection, streamId, Unpooled.EMPTY_BUFFER, false);
1486 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
1487 throw Quiche.convertToException(res);
1488 }
1489 } catch (Exception e) {
1490 promise.setFailure(e);
1491 return;
1492 }
1493 if (type == QuicStreamType.UNIDIRECTIONAL) {
1494 UNI_STREAMS_LEFT_UPDATER.decrementAndGet(QuicheQuicChannel.this);
1495 } else {
1496 BIDI_STREAMS_LEFT_UPDATER.decrementAndGet(QuicheQuicChannel.this);
1497 }
1498 QuicheQuicStreamChannel streamChannel = addNewStreamChannel(streamId);
1499 if (handler != null) {
1500 streamChannel.pipeline().addLast(handler);
1501 }
1502 eventLoop().register(streamChannel).addListener((ChannelFuture f) -> {
1503 if (f.isSuccess()) {
1504 promise.setSuccess(streamChannel);
1505 } else {
1506 promise.setFailure(f.cause());
1507 streams.remove(streamId);
1508 }
1509 });
1510 }
1511
1512 @Override
1513 public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
1514 assert eventLoop().inEventLoop();
1515 if (!channelPromise.setUncancellable()) {
1516 return;
1517 }
1518 if (server) {
1519 channelPromise.setFailure(new UnsupportedOperationException());
1520 return;
1521 }
1522
1523 if (connectPromise != null) {
1524 channelPromise.setFailure(new ConnectionPendingException());
1525 return;
1526 }
1527
1528 if (remote instanceof QuicConnectionAddress) {
1529 if (!sourceConnectionIds.isEmpty()) {
1530
1531 channelPromise.setFailure(new AlreadyConnectedException());
1532 return;
1533 }
1534
1535 connectAddress = (QuicConnectionAddress) remote;
1536 connectPromise = channelPromise;
1537
1538
1539 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1540 if (connectTimeoutMillis > 0) {
1541 connectTimeoutFuture = eventLoop().schedule(() -> {
1542 ChannelPromise connectPromise = QuicheQuicChannel.this.connectPromise;
1543 if (connectPromise != null && !connectPromise.isDone()
1544 && connectPromise.tryFailure(new ConnectTimeoutException(
1545 "connection timed out: " + remote))) {
1546 close(voidPromise());
1547 }
1548 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1549 }
1550
1551 connectPromise.addListener((ChannelFuture future) -> {
1552 if (future.isCancelled()) {
1553 if (connectTimeoutFuture != null) {
1554 connectTimeoutFuture.cancel(false);
1555 }
1556 connectPromise = null;
1557 close(voidPromise());
1558 }
1559 });
1560
1561 parent().connect(new QuicheQuicChannelAddress(QuicheQuicChannel.this))
1562 .addListener(f -> {
1563 ChannelPromise connectPromise = QuicheQuicChannel.this.connectPromise;
1564 if (connectPromise != null && !f.isSuccess()) {
1565 connectPromise.tryFailure(f.cause());
1566
1567 unsafe().closeForcibly();
1568 }
1569 });
1570 return;
1571 }
1572
1573 channelPromise.setFailure(new UnsupportedOperationException());
1574 }
1575
1576 private void fireConnectCloseEventIfNeeded(QuicheQuicConnection conn) {
1577 if (connectionCloseEvent == null && !conn.isFreed()) {
1578 connectionCloseEvent = Quiche.quiche_conn_peer_error(conn.address());
1579 if (connectionCloseEvent != null) {
1580 pipeline().fireUserEventTriggered(connectionCloseEvent);
1581 }
1582 }
1583 }
1584
1585 void connectionRecv(InetSocketAddress sender, InetSocketAddress recipient, ByteBuf buffer) {
1586 QuicheQuicConnection conn = QuicheQuicChannel.this.connection;
1587 if (conn.isFreed()) {
1588 return;
1589 }
1590 int bufferReadable = buffer.readableBytes();
1591 if (bufferReadable == 0) {
1592
1593
1594 return;
1595 }
1596
1597 reantranceGuard |= IN_RECV;
1598 boolean close = false;
1599 try {
1600 ByteBuf tmpBuffer = null;
1601
1602
1603 if (buffer.isReadOnly()) {
1604 tmpBuffer = alloc().directBuffer(buffer.readableBytes());
1605 tmpBuffer.writeBytes(buffer);
1606 buffer = tmpBuffer;
1607 }
1608 long memoryAddress = Quiche.readerMemoryAddress(buffer);
1609
1610 ByteBuffer recvInfo = conn.nextRecvInfo();
1611 QuicheRecvInfo.setRecvInfo(recvInfo, sender, recipient);
1612
1613 remote = sender;
1614 local = recipient;
1615
1616 try {
1617 do {
1618
1619 int res = Quiche.quiche_conn_recv(conn.address(), memoryAddress, bufferReadable,
1620 Quiche.memoryAddressWithPosition(recvInfo));
1621 final boolean done;
1622 if (res < 0) {
1623 done = true;
1624 if (res != Quiche.QUICHE_ERR_DONE) {
1625 close = Quiche.shouldClose(res);
1626 Exception e = Quiche.convertToException(res);
1627 if (tryFailConnectPromise(e)) {
1628 break;
1629 }
1630 fireExceptionEvents(conn, e);
1631 }
1632 } else {
1633 done = false;
1634 }
1635
1636 Runnable task = conn.sslTask();
1637 if (task != null) {
1638 if (runTasksDirectly()) {
1639
1640 do {
1641 task.run();
1642 } while ((task = conn.sslTask()) != null);
1643 processReceived(conn);
1644 } else {
1645 runAllTaskRecv(conn, task);
1646 }
1647 } else {
1648 processReceived(conn);
1649 }
1650
1651 if (done) {
1652 break;
1653 }
1654 memoryAddress += res;
1655 bufferReadable -= res;
1656 } while (bufferReadable > 0 && !conn.isFreed());
1657 } finally {
1658 buffer.skipBytes((int) (memoryAddress - Quiche.readerMemoryAddress(buffer)));
1659 if (tmpBuffer != null) {
1660 tmpBuffer.release();
1661 }
1662 }
1663 if (close) {
1664
1665 unsafe().close(newPromise());
1666 }
1667 } finally {
1668 reantranceGuard &= ~IN_RECV;
1669 }
1670 }
1671
1672 private void processReceived(QuicheQuicConnection conn) {
1673
1674 if (handlePendingChannelActive(conn)) {
1675
1676 return;
1677 }
1678
1679 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1680 fireConnectCloseEventIfNeeded(conn);
1681
1682 if (conn.isFreed()) {
1683 return;
1684 }
1685
1686 long connAddr = conn.address();
1687 if (Quiche.quiche_conn_is_established(connAddr) ||
1688 Quiche.quiche_conn_is_in_early_data(connAddr)) {
1689 long uniLeftOld = uniStreamsLeft;
1690 long bidiLeftOld = bidiStreamsLeft;
1691
1692 if (uniLeftOld == 0 || bidiLeftOld == 0) {
1693 long uniLeft = Quiche.quiche_conn_peer_streams_left_uni(connAddr);
1694 long bidiLeft = Quiche.quiche_conn_peer_streams_left_bidi(connAddr);
1695 uniStreamsLeft = uniLeft;
1696 bidiStreamsLeft = bidiLeft;
1697 if (uniLeftOld != uniLeft || bidiLeftOld != bidiLeft) {
1698 pipeline().fireUserEventTriggered(QuicStreamLimitChangedEvent.INSTANCE);
1699 }
1700 }
1701
1702 handlePathEvents(conn);
1703
1704 if (handleWritableStreams(conn)) {
1705
1706 flushParent();
1707 }
1708
1709 datagramReadable = true;
1710 streamReadable = true;
1711
1712 recvDatagram(conn);
1713 recvStream(conn);
1714 }
1715 }
1716
1717 private void handlePathEvents(QuicheQuicConnection conn) {
1718 long event;
1719 while (!conn.isFreed() && (event = Quiche.quiche_conn_path_event_next(conn.address())) > 0) {
1720 try {
1721 int type = Quiche.quiche_path_event_type(event);
1722
1723 if (type == Quiche.QUICHE_PATH_EVENT_NEW) {
1724 Object[] ret = Quiche.quiche_path_event_new(event);
1725 InetSocketAddress local = (InetSocketAddress) ret[0];
1726 InetSocketAddress peer = (InetSocketAddress) ret[1];
1727 pipeline().fireUserEventTriggered(new QuicPathEvent.New(local, peer));
1728 } else if (type == Quiche.QUICHE_PATH_EVENT_VALIDATED) {
1729 Object[] ret = Quiche.quiche_path_event_validated(event);
1730 InetSocketAddress local = (InetSocketAddress) ret[0];
1731 InetSocketAddress peer = (InetSocketAddress) ret[1];
1732 pipeline().fireUserEventTriggered(new QuicPathEvent.Validated(local, peer));
1733 } else if (type == Quiche.QUICHE_PATH_EVENT_FAILED_VALIDATION) {
1734 Object[] ret = Quiche.quiche_path_event_failed_validation(event);
1735 InetSocketAddress local = (InetSocketAddress) ret[0];
1736 InetSocketAddress peer = (InetSocketAddress) ret[1];
1737 pipeline().fireUserEventTriggered(new QuicPathEvent.FailedValidation(local, peer));
1738 } else if (type == Quiche.QUICHE_PATH_EVENT_CLOSED) {
1739 Object[] ret = Quiche.quiche_path_event_closed(event);
1740 InetSocketAddress local = (InetSocketAddress) ret[0];
1741 InetSocketAddress peer = (InetSocketAddress) ret[1];
1742 pipeline().fireUserEventTriggered(new QuicPathEvent.Closed(local, peer));
1743 } else if (type == Quiche.QUICHE_PATH_EVENT_REUSED_SOURCE_CONNECTION_ID) {
1744 Object[] ret = Quiche.quiche_path_event_reused_source_connection_id(event);
1745 Long seq = (Long) ret[0];
1746 InetSocketAddress localOld = (InetSocketAddress) ret[1];
1747 InetSocketAddress peerOld = (InetSocketAddress) ret[2];
1748 InetSocketAddress local = (InetSocketAddress) ret[3];
1749 InetSocketAddress peer = (InetSocketAddress) ret[4];
1750 pipeline().fireUserEventTriggered(
1751 new QuicPathEvent.ReusedSourceConnectionId(seq, localOld, peerOld, local, peer));
1752 } else if (type == Quiche.QUICHE_PATH_EVENT_PEER_MIGRATED) {
1753 Object[] ret = Quiche.quiche_path_event_peer_migrated(event);
1754 InetSocketAddress local = (InetSocketAddress) ret[0];
1755 InetSocketAddress peer = (InetSocketAddress) ret[1];
1756 pipeline().fireUserEventTriggered(new QuicPathEvent.PeerMigrated(local, peer));
1757 }
1758 } finally {
1759 Quiche.quiche_path_event_free(event);
1760 }
1761 }
1762 }
1763
1764 private void runAllTaskRecv(QuicheQuicConnection conn, Runnable task) {
1765 sslTaskExecutor.execute(decorateTaskRecv(conn, task));
1766 }
1767
1768 private Runnable decorateTaskRecv(QuicheQuicConnection conn, Runnable task) {
1769 return () -> {
1770 try {
1771 runAll(conn, task);
1772 } finally {
1773
1774 eventLoop().execute(() -> {
1775 if (!conn.isFreed()) {
1776 processReceived(conn);
1777
1778
1779 if (connectionSend(conn) != SendResult.NONE) {
1780 forceFlushParent();
1781 }
1782
1783 freeIfClosed();
1784 }
1785 });
1786 }
1787 };
1788 }
1789 void recv() {
1790 QuicheQuicConnection conn = connection;
1791 if ((reantranceGuard & IN_RECV) != 0 || conn.isFreed()) {
1792 return;
1793 }
1794
1795 long connAddr = conn.address();
1796
1797 if (!Quiche.quiche_conn_is_established(connAddr) &&
1798 !Quiche.quiche_conn_is_in_early_data(connAddr)) {
1799 return;
1800 }
1801
1802 reantranceGuard |= IN_RECV;
1803 try {
1804 recvDatagram(conn);
1805 recvStream(conn);
1806 } finally {
1807 fireChannelReadCompleteIfNeeded();
1808 reantranceGuard &= ~IN_RECV;
1809 }
1810 }
1811
1812 private void recvStream(QuicheQuicConnection conn) {
1813 if (conn.isFreed()) {
1814 return;
1815 }
1816 long connAddr = conn.address();
1817 long readableIterator = Quiche.quiche_conn_readable(connAddr);
1818 int totalReadable = 0;
1819 if (readableIterator != -1) {
1820 try {
1821
1822 if (recvStreamPending && streamReadable) {
1823 for (;;) {
1824 int readable = Quiche.quiche_stream_iter_next(
1825 readableIterator, readableStreams);
1826 for (int i = 0; i < readable; i++) {
1827 long streamId = readableStreams[i];
1828 QuicheQuicStreamChannel streamChannel = streams.get(streamId);
1829 if (streamChannel == null) {
1830 recvStreamPending = false;
1831 fireChannelReadCompletePending = true;
1832 streamChannel = addNewStreamChannel(streamId);
1833 streamChannel.readable();
1834 pipeline().fireChannelRead(streamChannel);
1835 } else {
1836 streamChannel.readable();
1837 }
1838 }
1839 if (readable < readableStreams.length) {
1840
1841 streamReadable = false;
1842 break;
1843 }
1844 if (readable > 0) {
1845 totalReadable += readable;
1846 }
1847 }
1848 }
1849 } finally {
1850 Quiche.quiche_stream_iter_free(readableIterator);
1851 }
1852 readableStreams = growIfNeeded(readableStreams, totalReadable);
1853 }
1854 }
1855
1856 private void recvDatagram(QuicheQuicConnection conn) {
1857 if (!supportsDatagram) {
1858 return;
1859 }
1860 while (recvDatagramPending && datagramReadable && !conn.isFreed()) {
1861 @SuppressWarnings("deprecation")
1862 RecvByteBufAllocator.Handle recvHandle = recvBufAllocHandle();
1863 recvHandle.reset(config());
1864
1865 int numMessagesRead = 0;
1866 do {
1867 long connAddr = conn.address();
1868 int len = Quiche.quiche_conn_dgram_recv_front_len(connAddr);
1869 if (len == Quiche.QUICHE_ERR_DONE) {
1870 datagramReadable = false;
1871 return;
1872 }
1873
1874 ByteBuf datagramBuffer = alloc().directBuffer(len);
1875 recvHandle.attemptedBytesRead(datagramBuffer.writableBytes());
1876 int writerIndex = datagramBuffer.writerIndex();
1877 long memoryAddress = Quiche.writerMemoryAddress(datagramBuffer);
1878
1879 int written = Quiche.quiche_conn_dgram_recv(connAddr,
1880 memoryAddress, datagramBuffer.writableBytes());
1881 if (written < 0) {
1882 datagramBuffer.release();
1883 if (written == Quiche.QUICHE_ERR_DONE) {
1884
1885 datagramReadable = false;
1886 break;
1887 }
1888 pipeline().fireExceptionCaught(Quiche.convertToException(written));
1889 }
1890 recvHandle.lastBytesRead(written);
1891 recvHandle.incMessagesRead(1);
1892 numMessagesRead++;
1893 datagramBuffer.writerIndex(writerIndex + written);
1894 recvDatagramPending = false;
1895 fireChannelReadCompletePending = true;
1896
1897 pipeline().fireChannelRead(datagramBuffer);
1898 } while (recvHandle.continueReading() && !conn.isFreed());
1899 recvHandle.readComplete();
1900
1901
1902 if (numMessagesRead > 0) {
1903 fireChannelReadCompleteIfNeeded();
1904 }
1905 }
1906 }
1907
1908 private boolean handlePendingChannelActive(QuicheQuicConnection conn) {
1909 if (conn.isFreed() || state == ChannelState.CLOSED) {
1910 return true;
1911 }
1912 if (server) {
1913 if (state == ChannelState.OPEN && Quiche.quiche_conn_is_established(conn.address())) {
1914
1915 state = ChannelState.ACTIVE;
1916
1917 pipeline().fireChannelActive();
1918 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1919 fireDatagramExtensionEvent(conn);
1920 }
1921 } else if (connectPromise != null && Quiche.quiche_conn_is_established(conn.address())) {
1922 ChannelPromise promise = connectPromise;
1923 connectPromise = null;
1924 state = ChannelState.ACTIVE;
1925
1926 boolean promiseSet = promise.trySuccess();
1927 pipeline().fireChannelActive();
1928 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1929 fireDatagramExtensionEvent(conn);
1930 if (!promiseSet) {
1931 fireConnectCloseEventIfNeeded(conn);
1932 this.close(this.voidPromise());
1933 return true;
1934 }
1935 }
1936 return false;
1937 }
1938
1939 private void fireDatagramExtensionEvent(QuicheQuicConnection conn) {
1940 if (conn.isClosed()) {
1941 return;
1942 }
1943 long connAddr = conn.address();
1944 int len = Quiche.quiche_conn_dgram_max_writable_len(connAddr);
1945
1946 if (len != Quiche.QUICHE_ERR_DONE) {
1947 pipeline().fireUserEventTriggered(new QuicDatagramExtensionEvent(len));
1948 }
1949 }
1950
1951 private QuicheQuicStreamChannel addNewStreamChannel(long streamId) {
1952 QuicheQuicStreamChannel streamChannel = new QuicheQuicStreamChannel(
1953 QuicheQuicChannel.this, streamId);
1954 QuicheQuicStreamChannel old = streams.put(streamId, streamChannel);
1955 assert old == null;
1956 streamChannel.writable(streamCapacity(streamId));
1957 return streamChannel;
1958 }
1959 }
1960
1961
1962
1963
1964 void finishConnect() {
1965 assert !server;
1966 assert connection != null;
1967 if (connectionSend(connection) != SendResult.NONE) {
1968 flushParent();
1969 }
1970 }
1971
1972 private void notifyEarlyDataReadyIfNeeded(QuicheQuicConnection conn) {
1973 if (!server && !earlyDataReadyNotified &&
1974 !conn.isFreed() && Quiche.quiche_conn_is_in_early_data(conn.address())) {
1975 earlyDataReadyNotified = true;
1976 pipeline().fireUserEventTriggered(SslEarlyDataReadyEvent.INSTANCE);
1977 }
1978 }
1979
1980 private final class TimeoutHandler implements Runnable {
1981 private ScheduledFuture<?> timeoutFuture;
1982
1983 @Override
1984 public void run() {
1985 QuicheQuicConnection conn = connection;
1986 if (conn.isFreed()) {
1987 return;
1988 }
1989 if (!freeIfClosed()) {
1990 long connAddr = conn.address();
1991 timeoutFuture = null;
1992
1993 Quiche.quiche_conn_on_timeout(connAddr);
1994 if (!freeIfClosed()) {
1995
1996
1997 if (connectionSend(conn) != SendResult.NONE) {
1998 flushParent();
1999 }
2000 boolean closed = freeIfClosed();
2001 if (!closed) {
2002
2003 scheduleTimeout();
2004 }
2005 }
2006 }
2007 }
2008
2009
2010
2011 void scheduleTimeout() {
2012 QuicheQuicConnection conn = connection;
2013 if (conn.isFreed()) {
2014 cancel();
2015 return;
2016 }
2017 if (conn.isClosed()) {
2018 cancel();
2019 unsafe().close(newPromise());
2020 return;
2021 }
2022 long nanos = Quiche.quiche_conn_timeout_as_nanos(conn.address());
2023 if (nanos < 0 || nanos == Long.MAX_VALUE) {
2024
2025 cancel();
2026 return;
2027 }
2028 if (timeoutFuture == null) {
2029 timeoutFuture = eventLoop().schedule(this,
2030 nanos, TimeUnit.NANOSECONDS);
2031 } else {
2032 long remaining = timeoutFuture.getDelay(TimeUnit.NANOSECONDS);
2033 if (remaining <= 0) {
2034
2035
2036 cancel();
2037 run();
2038 } else if (remaining > nanos) {
2039
2040
2041 cancel();
2042 timeoutFuture = eventLoop().schedule(this, nanos, TimeUnit.NANOSECONDS);
2043 }
2044 }
2045 }
2046
2047 void cancel() {
2048 if (timeoutFuture != null) {
2049 timeoutFuture.cancel(false);
2050 timeoutFuture = null;
2051 }
2052 }
2053 }
2054
2055 @Override
2056 public Future<QuicConnectionStats> collectStats(Promise<QuicConnectionStats> promise) {
2057 if (eventLoop().inEventLoop()) {
2058 collectStats0(promise);
2059 } else {
2060 eventLoop().execute(() -> collectStats0(promise));
2061 }
2062 return promise;
2063 }
2064
2065 private void collectStats0(Promise<QuicConnectionStats> promise) {
2066 QuicheQuicConnection conn = connection;
2067 if (conn.isFreed()) {
2068 promise.setSuccess(statsAtClose);
2069 return;
2070 }
2071
2072 collectStats0(connection, promise);
2073 }
2074
2075 @Nullable
2076 private QuicConnectionStats collectStats0(QuicheQuicConnection connection, Promise<QuicConnectionStats> promise) {
2077 final long[] stats = Quiche.quiche_conn_stats(connection.address());
2078 if (stats == null) {
2079 promise.setFailure(new IllegalStateException("native quiche_conn_stats(...) failed"));
2080 return null;
2081 }
2082
2083 final QuicheQuicConnectionStats connStats =
2084 new QuicheQuicConnectionStats(stats);
2085 promise.setSuccess(connStats);
2086 return connStats;
2087 }
2088
2089 @Override
2090 public Future<QuicConnectionPathStats> collectPathStats(int pathIdx, Promise<QuicConnectionPathStats> promise) {
2091 if (eventLoop().inEventLoop()) {
2092 collectPathStats0(pathIdx, promise);
2093 } else {
2094 eventLoop().execute(() -> collectPathStats0(pathIdx, promise));
2095 }
2096 return promise;
2097 }
2098
2099 private void collectPathStats0(int pathIdx, Promise<QuicConnectionPathStats> promise) {
2100 QuicheQuicConnection conn = connection;
2101 if (conn.isFreed()) {
2102 promise.setFailure(new IllegalStateException("Connection is closed"));
2103 return;
2104 }
2105
2106 final Object[] stats = Quiche.quiche_conn_path_stats(connection.address(), pathIdx);
2107 if (stats == null) {
2108 promise.setFailure(new IllegalStateException("native quiche_conn_path_stats(...) failed"));
2109 return;
2110 }
2111 promise.setSuccess(new QuicheQuicConnectionPathStats(stats));
2112 }
2113
2114 @Override
2115 public QuicTransportParameters peerTransportParameters() {
2116 return connection.peerParameters();
2117 }
2118 }