1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.quic;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.AbstractChannel;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandler;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.ChannelMetadata;
27 import io.netty.channel.ChannelOption;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPipeline;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.DefaultChannelPipeline;
33 import io.netty.channel.EventLoop;
34 import io.netty.channel.RecvByteBufAllocator;
35 import io.netty.channel.socket.DatagramPacket;
36 import io.netty.handler.ssl.SniCompletionEvent;
37 import io.netty.handler.ssl.SslHandshakeCompletionEvent;
38 import io.netty.util.AttributeKey;
39 import io.netty.util.collection.LongObjectHashMap;
40 import io.netty.util.collection.LongObjectMap;
41 import io.netty.util.concurrent.Future;
42 import io.netty.util.concurrent.ImmediateEventExecutor;
43 import io.netty.util.concurrent.ImmediateExecutor;
44 import io.netty.util.concurrent.Promise;
45 import io.netty.util.internal.StringUtil;
46 import io.netty.util.internal.logging.InternalLogger;
47 import io.netty.util.internal.logging.InternalLoggerFactory;
48 import org.jetbrains.annotations.Nullable;
49
50 import javax.net.ssl.SSLEngine;
51 import javax.net.ssl.SSLHandshakeException;
52 import java.io.File;
53 import java.net.ConnectException;
54 import java.net.InetSocketAddress;
55 import java.net.SocketAddress;
56 import java.nio.BufferUnderflowException;
57 import java.nio.ByteBuffer;
58 import java.nio.channels.AlreadyConnectedException;
59 import java.nio.channels.ClosedChannelException;
60 import java.nio.channels.ConnectionPendingException;
61 import java.util.ArrayList;
62 import java.util.Collections;
63 import java.util.HashSet;
64 import java.util.List;
65 import java.util.Map;
66 import java.util.Set;
67 import java.util.concurrent.Executor;
68 import java.util.concurrent.ScheduledFuture;
69 import java.util.concurrent.TimeUnit;
70 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
71 import java.util.function.Consumer;
72 import java.util.function.Function;
73
74
75
76
77 final class QuicheQuicChannel extends AbstractChannel implements QuicChannel {
78 private static final InternalLogger logger = InternalLoggerFactory.getInstance(QuicheQuicChannel.class);
79 private static final String QLOG_FILE_EXTENSION = ".qlog";
80
81 enum StreamRecvResult {
82
83
84
85 DONE,
86
87
88
89 FIN,
90
91
92
93 OK
94 }
95
96 private enum ChannelState {
97 OPEN,
98 ACTIVE,
99 CLOSED
100 }
101
102 private enum SendResult {
103 SOME,
104 NONE,
105 CLOSE
106 }
107
108 private static final class CloseData implements ChannelFutureListener {
109 final boolean applicationClose;
110 final int err;
111 final ByteBuf reason;
112
113 CloseData(boolean applicationClose, int err, ByteBuf reason) {
114 this.applicationClose = applicationClose;
115 this.err = err;
116 this.reason = reason;
117 }
118
119 @Override
120 public void operationComplete(ChannelFuture future) {
121 reason.release();
122 }
123 }
124
125 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
126 private long[] readableStreams = new long[4];
127 private long[] writableStreams = new long[4];
128 private final LongObjectMap<QuicheQuicStreamChannel> streams = new LongObjectHashMap<>();
129 private final QuicheQuicChannelConfig config;
130 private final boolean server;
131 private final QuicStreamIdGenerator idGenerator;
132 private final ChannelHandler streamHandler;
133 private final Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray;
134 private final Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray;
135 private final TimeoutHandler timeoutHandler;
136 private final QuicConnectionIdGenerator connectionIdAddressGenerator;
137 private final QuicResetTokenGenerator resetTokenGenerator;
138 private final Set<ByteBuffer> sourceConnectionIds = new HashSet<>();
139
140 private Consumer<QuicheQuicChannel> freeTask;
141 private Executor sslTaskExecutor;
142 private boolean inFireChannelReadCompleteQueue;
143 private boolean fireChannelReadCompletePending;
144 private ByteBuf finBuffer;
145 private ByteBuf outErrorCodeBuffer;
146 private ChannelPromise connectPromise;
147 private ScheduledFuture<?> connectTimeoutFuture;
148 private QuicConnectionAddress connectAddress;
149 private CloseData closeData;
150 private QuicConnectionCloseEvent connectionCloseEvent;
151 private QuicConnectionStats statsAtClose;
152 private boolean supportsDatagram;
153 private boolean recvDatagramPending;
154 private boolean datagramReadable;
155 private boolean recvStreamPending;
156 private boolean streamReadable;
157 private boolean handshakeCompletionNotified;
158 private boolean earlyDataReadyNotified;
159 private int reantranceGuard;
160 private static final int IN_RECV = 1 << 1;
161 private static final int IN_CONNECTION_SEND = 1 << 2;
162 private static final int IN_HANDLE_WRITABLE_STREAMS = 1 << 3;
163 private volatile ChannelState state = ChannelState.OPEN;
164 private volatile boolean timedOut;
165 private volatile String traceId;
166 private volatile QuicheQuicConnection connection;
167 private volatile InetSocketAddress local;
168 private volatile InetSocketAddress remote;
169
170 private final ChannelFutureListener continueSendingListener = f -> {
171 if (connectionSend(connection) != SendResult.NONE) {
172 flushParent();
173 }
174 };
175
176 private static final AtomicLongFieldUpdater<QuicheQuicChannel> UNI_STREAMS_LEFT_UPDATER =
177 AtomicLongFieldUpdater.newUpdater(QuicheQuicChannel.class, "uniStreamsLeft");
178 private volatile long uniStreamsLeft;
179
180 private static final AtomicLongFieldUpdater<QuicheQuicChannel> BIDI_STREAMS_LEFT_UPDATER =
181 AtomicLongFieldUpdater.newUpdater(QuicheQuicChannel.class, "bidiStreamsLeft");
182 private volatile long bidiStreamsLeft;
183
184 private QuicheQuicChannel(Channel parent, boolean server, @Nullable ByteBuffer key, InetSocketAddress local,
185 InetSocketAddress remote, boolean supportsDatagram, ChannelHandler streamHandler,
186 Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
187 Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray,
188 @Nullable Consumer<QuicheQuicChannel> freeTask,
189 @Nullable Executor sslTaskExecutor,
190 @Nullable QuicConnectionIdGenerator connectionIdAddressGenerator,
191 @Nullable QuicResetTokenGenerator resetTokenGenerator) {
192 super(parent);
193 config = new QuicheQuicChannelConfig(this);
194 this.freeTask = freeTask;
195 this.server = server;
196 this.idGenerator = new QuicStreamIdGenerator(server);
197 this.connectionIdAddressGenerator = connectionIdAddressGenerator;
198 this.resetTokenGenerator = resetTokenGenerator;
199 if (key != null) {
200 this.sourceConnectionIds.add(key);
201 }
202
203 this.supportsDatagram = supportsDatagram;
204 this.local = local;
205 this.remote = remote;
206
207 this.streamHandler = streamHandler;
208 this.streamOptionsArray = streamOptionsArray;
209 this.streamAttrsArray = streamAttrsArray;
210 timeoutHandler = new TimeoutHandler();
211 this.sslTaskExecutor = sslTaskExecutor == null ? ImmediateExecutor.INSTANCE : sslTaskExecutor;
212 }
213
214 static QuicheQuicChannel forClient(Channel parent, InetSocketAddress local, InetSocketAddress remote,
215 ChannelHandler streamHandler,
216 Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
217 Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray) {
218 return new QuicheQuicChannel(parent, false, null, local, remote, false, streamHandler,
219 streamOptionsArray, streamAttrsArray, null, null, null, null);
220 }
221
222 static QuicheQuicChannel forServer(Channel parent, ByteBuffer key, InetSocketAddress local,
223 InetSocketAddress remote,
224 boolean supportsDatagram, ChannelHandler streamHandler,
225 Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
226 Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray,
227 Consumer<QuicheQuicChannel> freeTask, Executor sslTaskExecutor,
228 QuicConnectionIdGenerator connectionIdAddressGenerator,
229 QuicResetTokenGenerator resetTokenGenerator) {
230 return new QuicheQuicChannel(parent, true, key, local, remote, supportsDatagram,
231 streamHandler, streamOptionsArray, streamAttrsArray, freeTask,
232 sslTaskExecutor, connectionIdAddressGenerator, resetTokenGenerator);
233 }
234
235 private static final int MAX_ARRAY_LEN = 128;
236
237 private static long[] growIfNeeded(long[] array, int maxLength) {
238 if (maxLength > array.length) {
239 if (array.length == MAX_ARRAY_LEN) {
240 return array;
241 }
242
243 return new long[Math.min(MAX_ARRAY_LEN, array.length + 4)];
244 }
245 return array;
246 }
247
248 @Override
249 public boolean isTimedOut() {
250 return timedOut;
251 }
252
253 @Override
254 public SSLEngine sslEngine() {
255 QuicheQuicConnection connection = this.connection;
256 return connection == null ? null : connection.engine();
257 }
258
259 private void notifyAboutHandshakeCompletionIfNeeded(QuicheQuicConnection conn,
260 @Nullable SSLHandshakeException cause) {
261 if (handshakeCompletionNotified) {
262 return;
263 }
264 if (cause != null) {
265 pipeline().fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
266 return;
267 }
268 if (conn.isFreed()) {
269 return;
270 }
271 switch (connection.engine().getHandshakeStatus()) {
272 case NOT_HANDSHAKING:
273 case FINISHED:
274 handshakeCompletionNotified = true;
275 pipeline().fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
276 break;
277 default:
278 break;
279 }
280 }
281
282 @Override
283 public long peerAllowedStreams(QuicStreamType type) {
284 switch (type) {
285 case BIDIRECTIONAL:
286 return bidiStreamsLeft;
287 case UNIDIRECTIONAL:
288 return uniStreamsLeft;
289 default:
290 return 0;
291 }
292 }
293
294 void attachQuicheConnection(QuicheQuicConnection connection) {
295 this.connection = connection;
296
297 byte[] traceId = Quiche.quiche_conn_trace_id(connection.address());
298 if (traceId != null) {
299 this.traceId = new String(traceId);
300 }
301
302 connection.init(local, remote,
303 sniHostname -> pipeline().fireUserEventTriggered(new SniCompletionEvent(sniHostname)));
304
305
306 QLogConfiguration configuration = config.getQLogConfiguration();
307 if (configuration != null) {
308 final String fileName;
309 File file = new File(configuration.path());
310 if (file.isDirectory()) {
311
312 file.mkdir();
313 if (this.traceId != null) {
314 fileName = configuration.path() + File.separatorChar + this.traceId + "-" +
315 id().asShortText() + QLOG_FILE_EXTENSION;
316 } else {
317 fileName = configuration.path() + File.separatorChar + id().asShortText() + QLOG_FILE_EXTENSION;
318 }
319 } else {
320 fileName = configuration.path();
321 }
322
323 if (!Quiche.quiche_conn_set_qlog_path(connection.address(), fileName,
324 configuration.logTitle(), configuration.logDescription())) {
325 logger.info("Unable to create qlog file: {} ", fileName);
326 }
327 }
328 }
329
330 void connectNow(Function<QuicChannel, ? extends QuicSslEngine> engineProvider, Executor sslTaskExecutor,
331 Consumer<QuicheQuicChannel> freeTask, long configAddr, int localConnIdLength,
332 boolean supportsDatagram, ByteBuffer fromSockaddrMemory, ByteBuffer toSockaddrMemory)
333 throws Exception {
334 assert this.connection == null;
335 assert this.traceId == null;
336 assert this.sourceConnectionIds.isEmpty();
337
338 this.sslTaskExecutor = sslTaskExecutor;
339 this.freeTask = freeTask;
340
341 QuicConnectionAddress address = this.connectAddress;
342
343 if (address == QuicConnectionAddress.EPHEMERAL) {
344 address = QuicConnectionAddress.random(localConnIdLength);
345 }
346 ByteBuffer connectId = address.id();
347 if (connectId.remaining() != localConnIdLength) {
348 failConnectPromiseAndThrow(new IllegalArgumentException("connectionAddress has length "
349 + connectId.remaining()
350 + " instead of " + localConnIdLength));
351 }
352 QuicSslEngine engine = engineProvider.apply(this);
353 if (!(engine instanceof QuicheQuicSslEngine)) {
354 failConnectPromiseAndThrow(new IllegalArgumentException("QuicSslEngine is not of type "
355 + QuicheQuicSslEngine.class.getSimpleName()));
356 return;
357 }
358 if (!engine.getUseClientMode()) {
359 failConnectPromiseAndThrow(new IllegalArgumentException("QuicSslEngine is not create in client mode"));
360 }
361 QuicheQuicSslEngine quicheEngine = (QuicheQuicSslEngine) engine;
362 ByteBuf idBuffer = alloc().directBuffer(connectId.remaining()).writeBytes(connectId.duplicate());
363 try {
364 int fromSockaddrLen = SockaddrIn.setAddress(fromSockaddrMemory, local);
365 int toSockaddrLen = SockaddrIn.setAddress(toSockaddrMemory, remote);
366 QuicheQuicConnection connection = quicheEngine.createConnection(ssl ->
367 Quiche.quiche_conn_new_with_tls(Quiche.readerMemoryAddress(idBuffer),
368 idBuffer.readableBytes(), -1, -1,
369 Quiche.memoryAddressWithPosition(fromSockaddrMemory), fromSockaddrLen,
370 Quiche.memoryAddressWithPosition(toSockaddrMemory), toSockaddrLen,
371 configAddr, ssl, false));
372 if (connection == null) {
373 failConnectPromiseAndThrow(new ConnectException());
374 return;
375 }
376 attachQuicheConnection(connection);
377 QuicClientSessionCache sessionCache = quicheEngine.ctx.getSessionCache();
378 if (sessionCache != null) {
379 byte[] sessionBytes = sessionCache
380 .getSession(quicheEngine.getSession().getPeerHost(), quicheEngine.getSession().getPeerPort());
381 if (sessionBytes != null) {
382 Quiche.quiche_conn_set_session(connection.address(), sessionBytes);
383 }
384 }
385 this.supportsDatagram = supportsDatagram;
386 sourceConnectionIds.add(connectId);
387 } finally {
388 idBuffer.release();
389 }
390 }
391
392 private void failConnectPromiseAndThrow(Exception e) throws Exception {
393 tryFailConnectPromise(e);
394 throw e;
395 }
396
397 private boolean tryFailConnectPromise(Exception e) {
398 ChannelPromise promise = connectPromise;
399 if (promise != null) {
400 connectPromise = null;
401 promise.tryFailure(e);
402 return true;
403 }
404 return false;
405 }
406
407 Set<ByteBuffer> sourceConnectionIds() {
408 return sourceConnectionIds;
409 }
410
411 boolean markInFireChannelReadCompleteQueue() {
412 if (inFireChannelReadCompleteQueue) {
413 return false;
414 }
415 inFireChannelReadCompleteQueue = true;
416 return true;
417 }
418
419 private void failPendingConnectPromise() {
420 ChannelPromise promise = QuicheQuicChannel.this.connectPromise;
421 if (promise != null) {
422 QuicheQuicChannel.this.connectPromise = null;
423 promise.tryFailure(new QuicClosedChannelException(this.connectionCloseEvent));
424 }
425 }
426
427 void forceClose() {
428 unsafe().close(voidPromise());
429 }
430
431 @Override
432 protected DefaultChannelPipeline newChannelPipeline() {
433 return new DefaultChannelPipeline(this) {
434 @Override
435 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
436 if (msg instanceof QuicStreamChannel) {
437 QuicStreamChannel channel = (QuicStreamChannel) msg;
438 Quic.setupChannel(channel, streamOptionsArray, streamAttrsArray, streamHandler, logger);
439 ctx.channel().eventLoop().register(channel);
440 } else {
441 super.onUnhandledInboundMessage(ctx, msg);
442 }
443 }
444 };
445 }
446
447 @Override
448 public QuicChannel flush() {
449 super.flush();
450 return this;
451 }
452
453 @Override
454 public QuicChannel read() {
455 super.read();
456 return this;
457 }
458
459 @Override
460 public Future<QuicStreamChannel> createStream(QuicStreamType type, @Nullable ChannelHandler handler,
461 Promise<QuicStreamChannel> promise) {
462 if (eventLoop().inEventLoop()) {
463 ((QuicChannelUnsafe) unsafe()).connectStream(type, handler, promise);
464 } else {
465 eventLoop().execute(() -> ((QuicChannelUnsafe) unsafe()).connectStream(type, handler, promise));
466 }
467 return promise;
468 }
469
470 @Override
471 public ChannelFuture close(boolean applicationClose, int error, ByteBuf reason, ChannelPromise promise) {
472 if (eventLoop().inEventLoop()) {
473 close0(applicationClose, error, reason, promise);
474 } else {
475 eventLoop().execute(() -> close0(applicationClose, error, reason, promise));
476 }
477 return promise;
478 }
479
480 private void close0(boolean applicationClose, int error, ByteBuf reason, ChannelPromise promise) {
481 if (closeData == null) {
482 if (!reason.hasMemoryAddress()) {
483
484 ByteBuf copy = alloc().directBuffer(reason.readableBytes()).writeBytes(reason);
485 reason.release();
486 reason = copy;
487 }
488 closeData = new CloseData(applicationClose, error, reason);
489 promise.addListener(closeData);
490 } else {
491
492 reason.release();
493 }
494 close(promise);
495 }
496
497 @Override
498 public String toString() {
499 String traceId = this.traceId;
500 if (traceId == null) {
501 return "()" + super.toString();
502 } else {
503 return '(' + traceId + ')' + super.toString();
504 }
505 }
506
507 @Override
508 protected AbstractUnsafe newUnsafe() {
509 return new QuicChannelUnsafe();
510 }
511
512 @Override
513 protected boolean isCompatible(EventLoop eventLoop) {
514 return parent().eventLoop() == eventLoop;
515 }
516
517 @Override
518 @Nullable
519 protected QuicConnectionAddress localAddress0() {
520 QuicheQuicConnection connection = this.connection;
521 return connection == null ? null : connection.sourceId();
522 }
523
524 @Override
525 @Nullable
526 protected QuicConnectionAddress remoteAddress0() {
527 QuicheQuicConnection connection = this.connection;
528 return connection == null ? null : connection.destinationId();
529 }
530
531 @Override
532 @Nullable
533 public QuicConnectionAddress localAddress() {
534
535 return localAddress0();
536 }
537
538 @Override
539 @Nullable
540 public QuicConnectionAddress remoteAddress() {
541
542 return remoteAddress0();
543 }
544
545 @Override
546 @Nullable
547 public SocketAddress localSocketAddress() {
548 return local;
549 }
550
551 @Override
552 @Nullable
553 public SocketAddress remoteSocketAddress() {
554 return remote;
555 }
556
557 @Override
558 protected void doBind(SocketAddress socketAddress) {
559 throw new UnsupportedOperationException();
560 }
561
562 @Override
563 protected void doDisconnect() throws Exception {
564 doClose();
565 }
566
567 @Override
568 protected void doClose() throws Exception {
569 if (state == ChannelState.CLOSED) {
570 return;
571 }
572 state = ChannelState.CLOSED;
573
574 QuicheQuicConnection conn = this.connection;
575 if (conn == null || conn.isFreed()) {
576 if (closeData != null) {
577 closeData.reason.release();
578 closeData = null;
579 }
580 failPendingConnectPromise();
581 return;
582 }
583
584
585 SendResult sendResult = connectionSend(conn);
586
587 final boolean app;
588 final int err;
589 final ByteBuf reason;
590 if (closeData == null) {
591 app = false;
592 err = 0;
593 reason = Unpooled.EMPTY_BUFFER;
594 } else {
595 app = closeData.applicationClose;
596 err = closeData.err;
597 reason = closeData.reason;
598 closeData = null;
599 }
600
601 failPendingConnectPromise();
602 try {
603 int res = Quiche.quiche_conn_close(conn.address(), app, err,
604 Quiche.readerMemoryAddress(reason), reason.readableBytes());
605 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
606 throw Quiche.convertToException(res);
607 }
608
609
610
611
612 if (connectionSend(conn) == SendResult.SOME) {
613 sendResult = SendResult.SOME;
614 }
615 } finally {
616
617
618
619 statsAtClose = collectStats0(conn, eventLoop().newPromise());
620 try {
621 timedOut = Quiche.quiche_conn_is_timed_out(conn.address());
622
623 closeStreams();
624 if (finBuffer != null) {
625 finBuffer.release();
626 finBuffer = null;
627 }
628 if (outErrorCodeBuffer != null) {
629 outErrorCodeBuffer.release();
630 outErrorCodeBuffer = null;
631 }
632 } finally {
633 if (sendResult == SendResult.SOME) {
634
635 forceFlushParent();
636 } else {
637 flushParent();
638 }
639 conn.free();
640 if (freeTask != null) {
641 freeTask.accept(this);
642 }
643 timeoutHandler.cancel();
644
645 local = null;
646 remote = null;
647 }
648 }
649 }
650
651 @Override
652 protected void doBeginRead() {
653 recvDatagramPending = true;
654 recvStreamPending = true;
655 if (datagramReadable || streamReadable) {
656 ((QuicChannelUnsafe) unsafe()).recv();
657 }
658 }
659
660 @Override
661 protected Object filterOutboundMessage(Object msg) {
662 if (msg instanceof ByteBuf) {
663 return msg;
664 }
665 throw new UnsupportedOperationException("Unsupported message type: " + StringUtil.simpleClassName(msg));
666 }
667
668 @Override
669 protected void doWrite(ChannelOutboundBuffer channelOutboundBuffer) throws Exception {
670 if (!supportsDatagram) {
671 throw new UnsupportedOperationException("Datagram extension is not supported");
672 }
673 boolean sendSomething = false;
674 boolean retry = false;
675 QuicheQuicConnection conn = connection;
676 try {
677 for (;;) {
678 ByteBuf buffer = (ByteBuf) channelOutboundBuffer.current();
679 if (buffer == null) {
680 break;
681 }
682
683 int readable = buffer.readableBytes();
684 if (readable == 0) {
685
686 channelOutboundBuffer.remove();
687 continue;
688 }
689
690 final int res;
691 if (!buffer.isDirect() || buffer.nioBufferCount() > 1) {
692 ByteBuf tmpBuffer = alloc().directBuffer(readable);
693 try {
694 tmpBuffer.writeBytes(buffer, buffer.readerIndex(), readable);
695 res = sendDatagram(conn, tmpBuffer);
696 } finally {
697 tmpBuffer.release();
698 }
699 } else {
700 res = sendDatagram(conn, buffer);
701 }
702 if (res >= 0) {
703 channelOutboundBuffer.remove();
704 sendSomething = true;
705 retry = false;
706 } else {
707 if (res == Quiche.QUICHE_ERR_BUFFER_TOO_SHORT) {
708 retry = false;
709 channelOutboundBuffer.remove(new BufferUnderflowException());
710 } else if (res == Quiche.QUICHE_ERR_INVALID_STATE) {
711 throw new UnsupportedOperationException("Remote peer does not support Datagram extension");
712 } else if (res == Quiche.QUICHE_ERR_DONE) {
713 if (retry) {
714
715 for (;;) {
716 if (!channelOutboundBuffer.remove()) {
717
718 return;
719 }
720 }
721 }
722
723 sendSomething = false;
724
725
726 if (connectionSend(conn) != SendResult.NONE) {
727 forceFlushParent();
728 }
729
730 retry = true;
731 } else {
732 throw Quiche.convertToException(res);
733 }
734 }
735 }
736 } finally {
737 if (sendSomething && connectionSend(conn) != SendResult.NONE) {
738 flushParent();
739 }
740 }
741 }
742
743 private static int sendDatagram(QuicheQuicConnection conn, ByteBuf buf) throws ClosedChannelException {
744 return Quiche.quiche_conn_dgram_send(connectionAddressChecked(conn),
745 Quiche.readerMemoryAddress(buf), buf.readableBytes());
746 }
747
748 @Override
749 public QuicChannelConfig config() {
750 return config;
751 }
752
753 @Override
754 public boolean isOpen() {
755 return state != ChannelState.CLOSED;
756 }
757
758 @Override
759 public boolean isActive() {
760 return state == ChannelState.ACTIVE;
761 }
762
763 @Override
764 public ChannelMetadata metadata() {
765 return METADATA;
766 }
767
768
769
770
771
772 private void flushParent() {
773 if (!inFireChannelReadCompleteQueue) {
774 forceFlushParent();
775 }
776 }
777
778
779
780
781 private void forceFlushParent() {
782 parent().flush();
783 }
784
785 private static long connectionAddressChecked(@Nullable QuicheQuicConnection conn) throws ClosedChannelException {
786 if (conn == null || conn.isFreed()) {
787 throw new ClosedChannelException();
788 }
789 return conn.address();
790 }
791
792 boolean freeIfClosed() {
793 QuicheQuicConnection conn = connection;
794 if (conn == null || conn.isFreed()) {
795 return true;
796 }
797 if (conn.isClosed()) {
798 unsafe().close(newPromise());
799 return true;
800 }
801 return false;
802 }
803
804 private void closeStreams() {
805 if (streams.isEmpty()) {
806 return;
807 }
808 final ClosedChannelException closedChannelException;
809 if (isTimedOut()) {
810
811 closedChannelException = new QuicTimeoutClosedChannelException();
812 } else {
813 closedChannelException = new ClosedChannelException();
814 }
815
816
817 for (QuicheQuicStreamChannel stream: streams.values().toArray(new QuicheQuicStreamChannel[0])) {
818 stream.unsafe().close(closedChannelException, voidPromise());
819 }
820 streams.clear();
821 }
822
823 void streamPriority(long streamId, byte priority, boolean incremental) throws Exception {
824 int res = Quiche.quiche_conn_stream_priority(connectionAddressChecked(connection), streamId,
825 priority, incremental);
826 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
827 throw Quiche.convertToException(res);
828 }
829 }
830
831 void streamClosed(long streamId) {
832 streams.remove(streamId);
833 }
834
835 boolean isStreamLocalCreated(long streamId) {
836 return (streamId & 0x1) == (server ? 1 : 0);
837 }
838
839 QuicStreamType streamType(long streamId) {
840 return (streamId & 0x2) == 0 ? QuicStreamType.BIDIRECTIONAL : QuicStreamType.UNIDIRECTIONAL;
841 }
842
843 void streamShutdown(long streamId, boolean read, boolean write, int err, ChannelPromise promise) {
844 QuicheQuicConnection conn = this.connection;
845 final long connectionAddress;
846 try {
847 connectionAddress = connectionAddressChecked(conn);
848 } catch (ClosedChannelException e) {
849 promise.setFailure(e);
850 return;
851 }
852 int res = 0;
853 if (read) {
854 res |= Quiche.quiche_conn_stream_shutdown(connectionAddress, streamId, Quiche.QUICHE_SHUTDOWN_READ, err);
855 }
856 if (write) {
857 res |= Quiche.quiche_conn_stream_shutdown(connectionAddress, streamId, Quiche.QUICHE_SHUTDOWN_WRITE, err);
858 }
859
860
861
862
863
864 if (connectionSend(conn) != SendResult.NONE) {
865
866 forceFlushParent();
867 }
868 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
869 promise.setFailure(Quiche.convertToException(res));
870 } else {
871 promise.setSuccess();
872 }
873 }
874
875 void streamSendFin(long streamId) throws Exception {
876 QuicheQuicConnection conn = connection;
877 try {
878
879 int res = streamSend0(conn, streamId, Unpooled.EMPTY_BUFFER, true);
880 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
881 throw Quiche.convertToException(res);
882 }
883 } finally {
884
885
886
887
888 if (connectionSend(conn) != SendResult.NONE) {
889 flushParent();
890 }
891 }
892 }
893
894 int streamSend(long streamId, ByteBuf buffer, boolean fin) throws ClosedChannelException {
895 QuicheQuicConnection conn = connection;
896 if (buffer.nioBufferCount() == 1) {
897 return streamSend0(conn, streamId, buffer, fin);
898 }
899 ByteBuffer[] nioBuffers = buffer.nioBuffers();
900 int lastIdx = nioBuffers.length - 1;
901 int res = 0;
902 for (int i = 0; i < lastIdx; i++) {
903 ByteBuffer nioBuffer = nioBuffers[i];
904 while (nioBuffer.hasRemaining()) {
905 int localRes = streamSend(conn, streamId, nioBuffer, false);
906 if (localRes <= 0) {
907 return res;
908 }
909 res += localRes;
910
911 nioBuffer.position(nioBuffer.position() + localRes);
912 }
913 }
914 int localRes = streamSend(conn, streamId, nioBuffers[lastIdx], fin);
915 if (localRes > 0) {
916 res += localRes;
917 }
918 return res;
919 }
920
921 void connectionSendAndFlush() {
922 if (inFireChannelReadCompleteQueue || (reantranceGuard & IN_HANDLE_WRITABLE_STREAMS) != 0) {
923 return;
924 }
925 if (connectionSend(connection) != SendResult.NONE) {
926 flushParent();
927 }
928 }
929
930 private int streamSend0(QuicheQuicConnection conn, long streamId, ByteBuf buffer, boolean fin)
931 throws ClosedChannelException {
932 return Quiche.quiche_conn_stream_send(connectionAddressChecked(conn), streamId,
933 Quiche.readerMemoryAddress(buffer), buffer.readableBytes(), fin);
934 }
935
936 private int streamSend(QuicheQuicConnection conn, long streamId, ByteBuffer buffer, boolean fin)
937 throws ClosedChannelException {
938 return Quiche.quiche_conn_stream_send(connectionAddressChecked(conn), streamId,
939 Quiche.memoryAddressWithPosition(buffer), buffer.remaining(), fin);
940 }
941
942 StreamRecvResult streamRecv(long streamId, ByteBuf buffer) throws Exception {
943 QuicheQuicConnection conn = connection;
944 long connAddr = connectionAddressChecked(conn);
945 if (finBuffer == null) {
946 finBuffer = alloc().directBuffer(1);
947 }
948 if (outErrorCodeBuffer == null) {
949 outErrorCodeBuffer = alloc().directBuffer(8);
950 }
951 outErrorCodeBuffer.setLongLE(0, -1L);
952 int writerIndex = buffer.writerIndex();
953 int recvLen = Quiche.quiche_conn_stream_recv(connAddr, streamId,
954 Quiche.writerMemoryAddress(buffer), buffer.writableBytes(), Quiche.writerMemoryAddress(finBuffer),
955 Quiche.writerMemoryAddress(outErrorCodeBuffer));
956 long errorCode = outErrorCodeBuffer.getLongLE(0);
957 if (recvLen == Quiche.QUICHE_ERR_DONE) {
958 return StreamRecvResult.DONE;
959 } else if (recvLen < 0) {
960 throw Quiche.convertToException(recvLen, errorCode);
961 }
962 buffer.writerIndex(writerIndex + recvLen);
963 return finBuffer.getBoolean(0) ? StreamRecvResult.FIN : StreamRecvResult.OK;
964 }
965
966
967
968
969 void recv(InetSocketAddress sender, InetSocketAddress recipient, ByteBuf buffer) {
970 ((QuicChannelUnsafe) unsafe()).connectionRecv(sender, recipient, buffer);
971 }
972
973
974
975
976
977
978 List<ByteBuffer> retiredSourceConnectionId() {
979 QuicheQuicConnection connection = this.connection;
980 if (connection == null || connection.isFreed()) {
981 return Collections.emptyList();
982 }
983 long connAddr = connection.address();
984 assert connAddr != -1;
985 List<ByteBuffer> retiredSourceIds = null;
986 for (;;) {
987 byte[] retired = Quiche.quiche_conn_retired_scid_next(connAddr);
988 if (retired == null) {
989 break;
990 }
991 if (retiredSourceIds == null) {
992 retiredSourceIds = new ArrayList<>();
993 }
994 ByteBuffer retiredId = ByteBuffer.wrap(retired);
995 retiredSourceIds.add(retiredId);
996 sourceConnectionIds.remove(retiredId);
997 }
998 if (retiredSourceIds == null) {
999 return Collections.emptyList();
1000 }
1001 return retiredSourceIds;
1002 }
1003
1004 List<ByteBuffer> newSourceConnectionIds() {
1005 if (connectionIdAddressGenerator != null && resetTokenGenerator != null) {
1006 QuicheQuicConnection connection = this.connection;
1007 if (connection == null || connection.isFreed()) {
1008 return Collections.emptyList();
1009 }
1010 long connAddr = connection.address();
1011
1012
1013 int left = Quiche.quiche_conn_scids_left(connAddr);
1014 if (left > 0) {
1015 QuicConnectionAddress sourceAddr = connection.sourceId();
1016 if (sourceAddr == null) {
1017 return Collections.emptyList();
1018 }
1019 List<ByteBuffer> generatedIds = new ArrayList<>(left);
1020 boolean sendAndFlush = false;
1021 ByteBuffer key = sourceAddr.id();
1022 ByteBuf connIdBuffer = alloc().directBuffer(key.remaining());
1023
1024 byte[] resetTokenArray = new byte[Quic.RESET_TOKEN_LEN];
1025 try {
1026 do {
1027 ByteBuffer srcId = connectionIdAddressGenerator.newId(key.duplicate(), key.remaining())
1028 .asReadOnlyBuffer();
1029 connIdBuffer.clear();
1030 connIdBuffer.writeBytes(srcId.duplicate());
1031 ByteBuffer resetToken = resetTokenGenerator.newResetToken(srcId.duplicate());
1032 resetToken.get(resetTokenArray);
1033 long result = Quiche.quiche_conn_new_scid(
1034 connAddr, Quiche.memoryAddress(connIdBuffer, 0, connIdBuffer.readableBytes()),
1035 connIdBuffer.readableBytes(), resetTokenArray, false, -1);
1036 if (result < 0) {
1037 break;
1038 }
1039 sendAndFlush = true;
1040 generatedIds.add(srcId.duplicate());
1041 sourceConnectionIds.add(srcId);
1042 } while (--left > 0);
1043 } finally {
1044 connIdBuffer.release();
1045 }
1046
1047 if (sendAndFlush) {
1048 connectionSendAndFlush();
1049 }
1050 return generatedIds;
1051 }
1052 }
1053 return Collections.emptyList();
1054 }
1055
1056 void writable() {
1057 QuicheQuicConnection conn = connection;
1058 SendResult result = connectionSend(conn);
1059 handleWritableStreams(conn);
1060 if (connectionSend(conn) == SendResult.SOME) {
1061 result = SendResult.SOME;
1062 }
1063 if (result == SendResult.SOME) {
1064
1065 forceFlushParent();
1066 }
1067 freeIfClosed();
1068 }
1069
1070 long streamCapacity(long streamId) {
1071 QuicheQuicConnection conn = connection;
1072 if (conn.isClosed()) {
1073 return 0;
1074 }
1075 return Quiche.quiche_conn_stream_capacity(conn.address(), streamId);
1076 }
1077
1078 private boolean handleWritableStreams(QuicheQuicConnection conn) {
1079 if (conn.isFreed()) {
1080 return false;
1081 }
1082 reantranceGuard |= IN_HANDLE_WRITABLE_STREAMS;
1083 try {
1084 long connAddr = conn.address();
1085 boolean mayNeedWrite = false;
1086
1087 if (Quiche.quiche_conn_is_established(connAddr) ||
1088 Quiche.quiche_conn_is_in_early_data(connAddr)) {
1089 long writableIterator = Quiche.quiche_conn_writable(connAddr);
1090
1091 int totalWritable = 0;
1092 try {
1093
1094 for (;;) {
1095 int writable = Quiche.quiche_stream_iter_next(
1096 writableIterator, writableStreams);
1097 for (int i = 0; i < writable; i++) {
1098 long streamId = writableStreams[i];
1099 QuicheQuicStreamChannel streamChannel = streams.get(streamId);
1100 if (streamChannel != null) {
1101 long capacity = Quiche.quiche_conn_stream_capacity(connAddr, streamId);
1102 if (streamChannel.writable(capacity)) {
1103 mayNeedWrite = true;
1104 }
1105 }
1106 }
1107 if (writable > 0) {
1108 totalWritable += writable;
1109 }
1110 if (writable < writableStreams.length) {
1111
1112 break;
1113 }
1114 }
1115 } finally {
1116 Quiche.quiche_stream_iter_free(writableIterator);
1117 }
1118 writableStreams = growIfNeeded(writableStreams, totalWritable);
1119 }
1120 return mayNeedWrite;
1121 } finally {
1122 reantranceGuard &= ~IN_HANDLE_WRITABLE_STREAMS;
1123 }
1124 }
1125
1126
1127
1128
1129
1130
1131 void recvComplete() {
1132 try {
1133 QuicheQuicConnection conn = connection;
1134 if (conn.isFreed()) {
1135
1136 forceFlushParent();
1137 return;
1138 }
1139 fireChannelReadCompleteIfNeeded();
1140
1141
1142
1143 connectionSend(conn);
1144
1145
1146 forceFlushParent();
1147 freeIfClosed();
1148 } finally {
1149 inFireChannelReadCompleteQueue = false;
1150 }
1151 }
1152
1153 private void fireChannelReadCompleteIfNeeded() {
1154 if (fireChannelReadCompletePending) {
1155 fireChannelReadCompletePending = false;
1156 pipeline().fireChannelReadComplete();
1157 }
1158 }
1159
1160 private void fireExceptionEvents(QuicheQuicConnection conn, Throwable cause) {
1161 if (cause instanceof SSLHandshakeException) {
1162 notifyAboutHandshakeCompletionIfNeeded(conn, (SSLHandshakeException) cause);
1163 }
1164 pipeline().fireExceptionCaught(cause);
1165 }
1166
1167 private boolean runTasksDirectly() {
1168 return sslTaskExecutor == null || sslTaskExecutor == ImmediateExecutor.INSTANCE ||
1169 sslTaskExecutor == ImmediateEventExecutor.INSTANCE;
1170 }
1171
1172 private void runAllTaskSend(QuicheQuicConnection conn, Runnable task) {
1173 sslTaskExecutor.execute(decorateTaskSend(conn, task));
1174 }
1175
1176 private void runAll(QuicheQuicConnection conn, Runnable task) {
1177 do {
1178 task.run();
1179 } while ((task = conn.sslTask()) != null);
1180 }
1181
1182 private Runnable decorateTaskSend(QuicheQuicConnection conn, Runnable task) {
1183 return () -> {
1184 try {
1185 runAll(conn, task);
1186 } finally {
1187
1188 eventLoop().execute(() -> {
1189
1190 if (connectionSend(conn) != SendResult.NONE) {
1191 forceFlushParent();
1192 }
1193 freeIfClosed();
1194 });
1195 }
1196 };
1197 }
1198
1199 private SendResult connectionSendSegments(QuicheQuicConnection conn,
1200 SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator) {
1201 if (conn.isClosed()) {
1202 return SendResult.NONE;
1203 }
1204 List<ByteBuf> bufferList = new ArrayList<>(segmentedDatagramPacketAllocator.maxNumSegments());
1205 long connAddr = conn.address();
1206 int maxDatagramSize = Quiche.quiche_conn_max_send_udp_payload_size(connAddr);
1207 SendResult sendResult = SendResult.NONE;
1208 boolean close = false;
1209 try {
1210 for (;;) {
1211 int len = calculateSendBufferLength(connAddr, maxDatagramSize);
1212 ByteBuf out = alloc().directBuffer(len);
1213
1214 ByteBuffer sendInfo = conn.nextSendInfo();
1215 InetSocketAddress sendToAddress = this.remote;
1216
1217 int writerIndex = out.writerIndex();
1218 int written = Quiche.quiche_conn_send(
1219 connAddr, Quiche.writerMemoryAddress(out), out.writableBytes(),
1220 Quiche.memoryAddressWithPosition(sendInfo));
1221 if (written == 0) {
1222 out.release();
1223
1224 continue;
1225 }
1226 final boolean done;
1227 if (written < 0) {
1228 done = true;
1229 if (written != Quiche.QUICHE_ERR_DONE) {
1230 close = Quiche.shouldClose(written);
1231 Exception e = Quiche.convertToException(written);
1232 if (!tryFailConnectPromise(e)) {
1233
1234 fireExceptionEvents(conn, e);
1235 }
1236 }
1237 } else {
1238 done = false;
1239 }
1240 int size = bufferList.size();
1241 if (done) {
1242
1243 out.release();
1244
1245 switch (size) {
1246 case 0:
1247
1248 break;
1249 case 1:
1250
1251 parent().write(new DatagramPacket(bufferList.get(0), sendToAddress));
1252 sendResult = SendResult.SOME;
1253 break;
1254 default:
1255 int segmentSize = segmentSize(bufferList);
1256 ByteBuf compositeBuffer = Unpooled.wrappedBuffer(bufferList.toArray(new ByteBuf[0]));
1257
1258 parent().write(segmentedDatagramPacketAllocator.newPacket(
1259 compositeBuffer, segmentSize, sendToAddress));
1260 sendResult = SendResult.SOME;
1261 break;
1262 }
1263 bufferList.clear();
1264 if (close) {
1265 sendResult = SendResult.CLOSE;
1266 }
1267 return sendResult;
1268 }
1269 out.writerIndex(writerIndex + written);
1270
1271 int segmentSize = -1;
1272 if (conn.isSendInfoChanged()) {
1273
1274 remote = QuicheSendInfo.getToAddress(sendInfo);
1275 local = QuicheSendInfo.getFromAddress(sendInfo);
1276
1277 if (size > 0) {
1278
1279
1280 segmentSize = segmentSize(bufferList);
1281 }
1282 } else if (size > 0) {
1283 int lastReadable = segmentSize(bufferList);
1284
1285
1286 if (lastReadable != out.readableBytes() ||
1287 size == segmentedDatagramPacketAllocator.maxNumSegments()) {
1288 segmentSize = lastReadable;
1289 }
1290 }
1291
1292
1293 if (segmentSize != -1) {
1294 final boolean stop;
1295 if (size == 1) {
1296
1297 stop = writePacket(new DatagramPacket(
1298 bufferList.get(0), sendToAddress), maxDatagramSize, len);
1299 } else {
1300
1301 ByteBuf compositeBuffer = Unpooled.wrappedBuffer(bufferList.toArray(new ByteBuf[0]));
1302 stop = writePacket(segmentedDatagramPacketAllocator.newPacket(
1303 compositeBuffer, segmentSize, sendToAddress), maxDatagramSize, len);
1304 }
1305 bufferList.clear();
1306 sendResult = SendResult.SOME;
1307
1308 if (stop) {
1309
1310
1311
1312 if (out.isReadable()) {
1313 parent().write(new DatagramPacket(out, sendToAddress));
1314 } else {
1315 out.release();
1316 }
1317 if (close) {
1318 sendResult = SendResult.CLOSE;
1319 }
1320 return sendResult;
1321 }
1322 }
1323
1324
1325 out.touch(bufferList);
1326
1327 bufferList.add(out);
1328 }
1329 } finally {
1330
1331 }
1332 }
1333
1334 private static int segmentSize(List<ByteBuf> bufferList) {
1335 assert !bufferList.isEmpty();
1336 int size = bufferList.size();
1337 return bufferList.get(size - 1).readableBytes();
1338 }
1339
1340 private SendResult connectionSendSimple(QuicheQuicConnection conn) {
1341 if (conn.isClosed()) {
1342 return SendResult.NONE;
1343 }
1344 long connAddr = conn.address();
1345 SendResult sendResult = SendResult.NONE;
1346 boolean close = false;
1347 int maxDatagramSize = Quiche.quiche_conn_max_send_udp_payload_size(connAddr);
1348 for (;;) {
1349 ByteBuffer sendInfo = conn.nextSendInfo();
1350
1351 int len = calculateSendBufferLength(connAddr, maxDatagramSize);
1352 ByteBuf out = alloc().directBuffer(len);
1353 int writerIndex = out.writerIndex();
1354
1355 int written = Quiche.quiche_conn_send(
1356 connAddr, Quiche.writerMemoryAddress(out), out.writableBytes(),
1357 Quiche.memoryAddressWithPosition(sendInfo));
1358
1359 if (written == 0) {
1360
1361 out.release();
1362 continue;
1363 }
1364 if (written < 0) {
1365 out.release();
1366 if (written != Quiche.QUICHE_ERR_DONE) {
1367 close = Quiche.shouldClose(written);
1368
1369 Exception e = Quiche.convertToException(written);
1370 if (!tryFailConnectPromise(e)) {
1371 fireExceptionEvents(conn, e);
1372 }
1373 }
1374 break;
1375 }
1376 if (conn.isSendInfoChanged()) {
1377
1378 remote = QuicheSendInfo.getToAddress(sendInfo);
1379 local = QuicheSendInfo.getFromAddress(sendInfo);
1380 }
1381 out.writerIndex(writerIndex + written);
1382 boolean stop = writePacket(new DatagramPacket(out, remote), maxDatagramSize, len);
1383 sendResult = SendResult.SOME;
1384 if (stop) {
1385
1386 break;
1387 }
1388 }
1389 if (close) {
1390 sendResult = SendResult.CLOSE;
1391 }
1392 return sendResult;
1393 }
1394
1395 private boolean writePacket(DatagramPacket packet, int maxDatagramSize, int len) {
1396 ChannelFuture future = parent().write(packet);
1397 if (isSendWindowUsed(maxDatagramSize, len)) {
1398
1399 future.addListener(continueSendingListener);
1400 return true;
1401 }
1402 return false;
1403 }
1404
1405 private static boolean isSendWindowUsed(int maxDatagramSize, int len) {
1406 return len < maxDatagramSize;
1407 }
1408
1409 private static int calculateSendBufferLength(long connAddr, int maxDatagramSize) {
1410 int len = Math.min(maxDatagramSize, Quiche.quiche_conn_send_quantum(connAddr));
1411 if (len <= 0) {
1412
1413
1414
1415
1416 return 8;
1417 }
1418 return len;
1419 }
1420
1421
1422
1423
1424
1425 private SendResult connectionSend(QuicheQuicConnection conn) {
1426 if (conn.isFreed()) {
1427 return SendResult.NONE;
1428 }
1429 if ((reantranceGuard & IN_CONNECTION_SEND) != 0) {
1430
1431 notifyEarlyDataReadyIfNeeded(conn);
1432 return SendResult.NONE;
1433 }
1434
1435 reantranceGuard |= IN_CONNECTION_SEND;
1436 try {
1437 SendResult sendResult;
1438 SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator =
1439 config.getSegmentedDatagramPacketAllocator();
1440 if (segmentedDatagramPacketAllocator.maxNumSegments() > 0) {
1441 sendResult = connectionSendSegments(conn, segmentedDatagramPacketAllocator);
1442 } else {
1443 sendResult = connectionSendSimple(conn);
1444 }
1445
1446
1447
1448 Runnable task = conn.sslTask();
1449 if (task != null) {
1450 if (runTasksDirectly()) {
1451
1452 do {
1453 task.run();
1454
1455 notifyEarlyDataReadyIfNeeded(conn);
1456 } while ((task = conn.sslTask()) != null);
1457
1458
1459
1460 eventLoop().execute(new Runnable() {
1461 @Override
1462 public void run() {
1463
1464 if (connectionSend(conn) != SendResult.NONE) {
1465 forceFlushParent();
1466 }
1467 freeIfClosed();
1468 }
1469 });
1470 } else {
1471 runAllTaskSend(conn, task);
1472 }
1473 } else {
1474
1475 notifyEarlyDataReadyIfNeeded(conn);
1476 }
1477
1478
1479 timeoutHandler.scheduleTimeout();
1480 return sendResult;
1481 } finally {
1482 reantranceGuard &= ~IN_CONNECTION_SEND;
1483 }
1484 }
1485
1486 private final class QuicChannelUnsafe extends AbstractChannel.AbstractUnsafe {
1487
1488 void connectStream(QuicStreamType type, @Nullable ChannelHandler handler,
1489 Promise<QuicStreamChannel> promise) {
1490 if (!promise.setUncancellable()) {
1491 return;
1492 }
1493 long streamId = idGenerator.nextStreamId(type == QuicStreamType.BIDIRECTIONAL);
1494
1495 try {
1496 int res = streamSend0(connection, streamId, Unpooled.EMPTY_BUFFER, false);
1497 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
1498 throw Quiche.convertToException(res);
1499 }
1500 } catch (Exception e) {
1501 promise.setFailure(e);
1502 return;
1503 }
1504 if (type == QuicStreamType.UNIDIRECTIONAL) {
1505 UNI_STREAMS_LEFT_UPDATER.decrementAndGet(QuicheQuicChannel.this);
1506 } else {
1507 BIDI_STREAMS_LEFT_UPDATER.decrementAndGet(QuicheQuicChannel.this);
1508 }
1509 QuicheQuicStreamChannel streamChannel = addNewStreamChannel(streamId);
1510 if (handler != null) {
1511 streamChannel.pipeline().addLast(handler);
1512 }
1513 eventLoop().register(streamChannel).addListener((ChannelFuture f) -> {
1514 if (f.isSuccess()) {
1515 promise.setSuccess(streamChannel);
1516 } else {
1517 promise.setFailure(f.cause());
1518 streams.remove(streamId);
1519 }
1520 });
1521 }
1522
1523 @Override
1524 public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
1525 assert eventLoop().inEventLoop();
1526 if (!channelPromise.setUncancellable()) {
1527 return;
1528 }
1529 if (server) {
1530 channelPromise.setFailure(new UnsupportedOperationException());
1531 return;
1532 }
1533
1534 if (connectPromise != null) {
1535 channelPromise.setFailure(new ConnectionPendingException());
1536 return;
1537 }
1538
1539 if (remote instanceof QuicConnectionAddress) {
1540 if (!sourceConnectionIds.isEmpty()) {
1541
1542 channelPromise.setFailure(new AlreadyConnectedException());
1543 return;
1544 }
1545
1546 connectAddress = (QuicConnectionAddress) remote;
1547 connectPromise = channelPromise;
1548
1549
1550 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1551 if (connectTimeoutMillis > 0) {
1552 connectTimeoutFuture = eventLoop().schedule(() -> {
1553 ChannelPromise connectPromise = QuicheQuicChannel.this.connectPromise;
1554 if (connectPromise != null && !connectPromise.isDone()
1555 && connectPromise.tryFailure(new ConnectTimeoutException(
1556 "connection timed out: " + remote))) {
1557 close(voidPromise());
1558 }
1559 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1560 }
1561
1562 connectPromise.addListener((ChannelFuture future) -> {
1563 if (future.isCancelled()) {
1564 if (connectTimeoutFuture != null) {
1565 connectTimeoutFuture.cancel(false);
1566 }
1567 connectPromise = null;
1568 close(voidPromise());
1569 }
1570 });
1571
1572 parent().connect(new QuicheQuicChannelAddress(QuicheQuicChannel.this))
1573 .addListener(f -> {
1574 ChannelPromise connectPromise = QuicheQuicChannel.this.connectPromise;
1575 if (connectPromise != null && !f.isSuccess()) {
1576 connectPromise.tryFailure(f.cause());
1577
1578 unsafe().closeForcibly();
1579 }
1580 });
1581 return;
1582 }
1583
1584 channelPromise.setFailure(new UnsupportedOperationException());
1585 }
1586
1587 private void fireConnectCloseEventIfNeeded(QuicheQuicConnection conn) {
1588 if (connectionCloseEvent == null && !conn.isFreed()) {
1589 connectionCloseEvent = Quiche.quiche_conn_peer_error(conn.address());
1590 if (connectionCloseEvent != null) {
1591 pipeline().fireUserEventTriggered(connectionCloseEvent);
1592 }
1593 }
1594 }
1595
1596 void connectionRecv(InetSocketAddress sender, InetSocketAddress recipient, ByteBuf buffer) {
1597 QuicheQuicConnection conn = QuicheQuicChannel.this.connection;
1598 if (conn.isFreed()) {
1599 return;
1600 }
1601 int bufferReadable = buffer.readableBytes();
1602 if (bufferReadable == 0) {
1603
1604
1605 return;
1606 }
1607
1608 reantranceGuard |= IN_RECV;
1609 boolean close = false;
1610 try {
1611 ByteBuf tmpBuffer = null;
1612
1613
1614 if (buffer.isReadOnly()) {
1615 tmpBuffer = alloc().directBuffer(buffer.readableBytes());
1616 tmpBuffer.writeBytes(buffer);
1617 buffer = tmpBuffer;
1618 }
1619 long memoryAddress = Quiche.readerMemoryAddress(buffer);
1620
1621 ByteBuffer recvInfo = conn.nextRecvInfo();
1622 QuicheRecvInfo.setRecvInfo(recvInfo, sender, recipient);
1623
1624 remote = sender;
1625 local = recipient;
1626
1627 try {
1628 do {
1629
1630 int res = Quiche.quiche_conn_recv(conn.address(), memoryAddress, bufferReadable,
1631 Quiche.memoryAddressWithPosition(recvInfo));
1632 final boolean done;
1633 if (res < 0) {
1634 done = true;
1635 if (res != Quiche.QUICHE_ERR_DONE) {
1636 close = Quiche.shouldClose(res);
1637 Exception e = Quiche.convertToException(res);
1638 if (tryFailConnectPromise(e)) {
1639 break;
1640 }
1641 fireExceptionEvents(conn, e);
1642 }
1643 } else {
1644 done = false;
1645 }
1646
1647 Runnable task = conn.sslTask();
1648 if (task != null) {
1649 if (runTasksDirectly()) {
1650
1651 do {
1652 task.run();
1653 } while ((task = conn.sslTask()) != null);
1654 processReceived(conn);
1655 } else {
1656 runAllTaskRecv(conn, task);
1657 }
1658 } else {
1659 processReceived(conn);
1660 }
1661
1662 if (done) {
1663 break;
1664 }
1665 memoryAddress += res;
1666 bufferReadable -= res;
1667 } while (bufferReadable > 0 && !conn.isFreed());
1668 } finally {
1669 buffer.skipBytes((int) (memoryAddress - Quiche.readerMemoryAddress(buffer)));
1670 if (tmpBuffer != null) {
1671 tmpBuffer.release();
1672 }
1673 }
1674 if (close) {
1675
1676 unsafe().close(newPromise());
1677 }
1678 } finally {
1679 reantranceGuard &= ~IN_RECV;
1680 }
1681 }
1682
1683 private void processReceived(QuicheQuicConnection conn) {
1684
1685 if (handlePendingChannelActive(conn)) {
1686
1687 return;
1688 }
1689
1690 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1691 fireConnectCloseEventIfNeeded(conn);
1692
1693 if (conn.isFreed()) {
1694 return;
1695 }
1696
1697 long connAddr = conn.address();
1698 if (Quiche.quiche_conn_is_established(connAddr) ||
1699 Quiche.quiche_conn_is_in_early_data(connAddr)) {
1700 long uniLeftOld = uniStreamsLeft;
1701 long bidiLeftOld = bidiStreamsLeft;
1702
1703 if (uniLeftOld == 0 || bidiLeftOld == 0) {
1704 long uniLeft = Quiche.quiche_conn_peer_streams_left_uni(connAddr);
1705 long bidiLeft = Quiche.quiche_conn_peer_streams_left_bidi(connAddr);
1706 uniStreamsLeft = uniLeft;
1707 bidiStreamsLeft = bidiLeft;
1708 if (uniLeftOld != uniLeft || bidiLeftOld != bidiLeft) {
1709 pipeline().fireUserEventTriggered(QuicStreamLimitChangedEvent.INSTANCE);
1710 }
1711 }
1712
1713 handlePathEvents(conn);
1714
1715 if (handleWritableStreams(conn)) {
1716
1717 flushParent();
1718 }
1719
1720 datagramReadable = true;
1721 streamReadable = true;
1722
1723 recvDatagram(conn);
1724 recvStream(conn);
1725 }
1726 }
1727
1728 private void handlePathEvents(QuicheQuicConnection conn) {
1729 long event;
1730 while (!conn.isFreed() && (event = Quiche.quiche_conn_path_event_next(conn.address())) > 0) {
1731 try {
1732 int type = Quiche.quiche_path_event_type(event);
1733
1734 if (type == Quiche.QUICHE_PATH_EVENT_NEW) {
1735 Object[] ret = Quiche.quiche_path_event_new(event);
1736 InetSocketAddress local = (InetSocketAddress) ret[0];
1737 InetSocketAddress peer = (InetSocketAddress) ret[1];
1738 pipeline().fireUserEventTriggered(new QuicPathEvent.New(local, peer));
1739 } else if (type == Quiche.QUICHE_PATH_EVENT_VALIDATED) {
1740 Object[] ret = Quiche.quiche_path_event_validated(event);
1741 InetSocketAddress local = (InetSocketAddress) ret[0];
1742 InetSocketAddress peer = (InetSocketAddress) ret[1];
1743 pipeline().fireUserEventTriggered(new QuicPathEvent.Validated(local, peer));
1744 } else if (type == Quiche.QUICHE_PATH_EVENT_FAILED_VALIDATION) {
1745 Object[] ret = Quiche.quiche_path_event_failed_validation(event);
1746 InetSocketAddress local = (InetSocketAddress) ret[0];
1747 InetSocketAddress peer = (InetSocketAddress) ret[1];
1748 pipeline().fireUserEventTriggered(new QuicPathEvent.FailedValidation(local, peer));
1749 } else if (type == Quiche.QUICHE_PATH_EVENT_CLOSED) {
1750 Object[] ret = Quiche.quiche_path_event_closed(event);
1751 InetSocketAddress local = (InetSocketAddress) ret[0];
1752 InetSocketAddress peer = (InetSocketAddress) ret[1];
1753 pipeline().fireUserEventTriggered(new QuicPathEvent.Closed(local, peer));
1754 } else if (type == Quiche.QUICHE_PATH_EVENT_REUSED_SOURCE_CONNECTION_ID) {
1755 Object[] ret = Quiche.quiche_path_event_reused_source_connection_id(event);
1756 Long seq = (Long) ret[0];
1757 InetSocketAddress localOld = (InetSocketAddress) ret[1];
1758 InetSocketAddress peerOld = (InetSocketAddress) ret[2];
1759 InetSocketAddress local = (InetSocketAddress) ret[3];
1760 InetSocketAddress peer = (InetSocketAddress) ret[4];
1761 pipeline().fireUserEventTriggered(
1762 new QuicPathEvent.ReusedSourceConnectionId(seq, localOld, peerOld, local, peer));
1763 } else if (type == Quiche.QUICHE_PATH_EVENT_PEER_MIGRATED) {
1764 Object[] ret = Quiche.quiche_path_event_peer_migrated(event);
1765 InetSocketAddress local = (InetSocketAddress) ret[0];
1766 InetSocketAddress peer = (InetSocketAddress) ret[1];
1767 pipeline().fireUserEventTriggered(new QuicPathEvent.PeerMigrated(local, peer));
1768 }
1769 } finally {
1770 Quiche.quiche_path_event_free(event);
1771 }
1772 }
1773 }
1774
1775 private void runAllTaskRecv(QuicheQuicConnection conn, Runnable task) {
1776 sslTaskExecutor.execute(decorateTaskRecv(conn, task));
1777 }
1778
1779 private Runnable decorateTaskRecv(QuicheQuicConnection conn, Runnable task) {
1780 return () -> {
1781 try {
1782 runAll(conn, task);
1783 } finally {
1784
1785 eventLoop().execute(() -> {
1786 if (!conn.isFreed()) {
1787 processReceived(conn);
1788
1789
1790 if (connectionSend(conn) != SendResult.NONE) {
1791 forceFlushParent();
1792 }
1793
1794 freeIfClosed();
1795 }
1796 });
1797 }
1798 };
1799 }
1800 void recv() {
1801 QuicheQuicConnection conn = connection;
1802 if ((reantranceGuard & IN_RECV) != 0 || conn.isFreed()) {
1803 return;
1804 }
1805
1806 long connAddr = conn.address();
1807
1808 if (!Quiche.quiche_conn_is_established(connAddr) &&
1809 !Quiche.quiche_conn_is_in_early_data(connAddr)) {
1810 return;
1811 }
1812
1813 reantranceGuard |= IN_RECV;
1814 try {
1815 recvDatagram(conn);
1816 recvStream(conn);
1817 } finally {
1818 fireChannelReadCompleteIfNeeded();
1819 reantranceGuard &= ~IN_RECV;
1820 }
1821 }
1822
1823 private void recvStream(QuicheQuicConnection conn) {
1824 if (conn.isFreed()) {
1825 return;
1826 }
1827 long connAddr = conn.address();
1828 long readableIterator = Quiche.quiche_conn_readable(connAddr);
1829 int totalReadable = 0;
1830 if (readableIterator != -1) {
1831 try {
1832
1833 if (recvStreamPending && streamReadable) {
1834 for (;;) {
1835 int readable = Quiche.quiche_stream_iter_next(
1836 readableIterator, readableStreams);
1837 for (int i = 0; i < readable; i++) {
1838 long streamId = readableStreams[i];
1839 QuicheQuicStreamChannel streamChannel = streams.get(streamId);
1840 if (streamChannel == null) {
1841 recvStreamPending = false;
1842 fireChannelReadCompletePending = true;
1843 streamChannel = addNewStreamChannel(streamId);
1844 streamChannel.readable();
1845 pipeline().fireChannelRead(streamChannel);
1846 } else {
1847 streamChannel.readable();
1848 }
1849 }
1850 if (readable < readableStreams.length) {
1851
1852 streamReadable = false;
1853 break;
1854 }
1855 if (readable > 0) {
1856 totalReadable += readable;
1857 }
1858 }
1859 }
1860 } finally {
1861 Quiche.quiche_stream_iter_free(readableIterator);
1862 }
1863 readableStreams = growIfNeeded(readableStreams, totalReadable);
1864 }
1865 }
1866
1867 private void recvDatagram(QuicheQuicConnection conn) {
1868 if (!supportsDatagram) {
1869 return;
1870 }
1871 while (recvDatagramPending && datagramReadable && !conn.isFreed()) {
1872 @SuppressWarnings("deprecation")
1873 RecvByteBufAllocator.Handle recvHandle = recvBufAllocHandle();
1874 recvHandle.reset(config());
1875
1876 int numMessagesRead = 0;
1877 do {
1878 long connAddr = conn.address();
1879 int len = Quiche.quiche_conn_dgram_recv_front_len(connAddr);
1880 if (len == Quiche.QUICHE_ERR_DONE) {
1881 datagramReadable = false;
1882 return;
1883 }
1884
1885 ByteBuf datagramBuffer = alloc().directBuffer(len);
1886 recvHandle.attemptedBytesRead(datagramBuffer.writableBytes());
1887 int writerIndex = datagramBuffer.writerIndex();
1888 long memoryAddress = Quiche.writerMemoryAddress(datagramBuffer);
1889
1890 int written = Quiche.quiche_conn_dgram_recv(connAddr,
1891 memoryAddress, datagramBuffer.writableBytes());
1892 if (written < 0) {
1893 datagramBuffer.release();
1894 if (written == Quiche.QUICHE_ERR_DONE) {
1895
1896 datagramReadable = false;
1897 break;
1898 }
1899 pipeline().fireExceptionCaught(Quiche.convertToException(written));
1900 }
1901 recvHandle.lastBytesRead(written);
1902 recvHandle.incMessagesRead(1);
1903 numMessagesRead++;
1904 datagramBuffer.writerIndex(writerIndex + written);
1905 recvDatagramPending = false;
1906 fireChannelReadCompletePending = true;
1907
1908 pipeline().fireChannelRead(datagramBuffer);
1909 } while (recvHandle.continueReading() && !conn.isFreed());
1910 recvHandle.readComplete();
1911
1912
1913 if (numMessagesRead > 0) {
1914 fireChannelReadCompleteIfNeeded();
1915 }
1916 }
1917 }
1918
1919 private boolean handlePendingChannelActive(QuicheQuicConnection conn) {
1920 if (conn.isFreed() || state == ChannelState.CLOSED) {
1921 return true;
1922 }
1923 if (server) {
1924 if (state == ChannelState.OPEN && Quiche.quiche_conn_is_established(conn.address())) {
1925
1926 state = ChannelState.ACTIVE;
1927
1928 pipeline().fireChannelActive();
1929 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1930 fireDatagramExtensionEvent(conn);
1931 }
1932 } else if (connectPromise != null && Quiche.quiche_conn_is_established(conn.address())) {
1933 ChannelPromise promise = connectPromise;
1934 connectPromise = null;
1935 state = ChannelState.ACTIVE;
1936
1937 boolean promiseSet = promise.trySuccess();
1938 pipeline().fireChannelActive();
1939 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1940 fireDatagramExtensionEvent(conn);
1941 if (!promiseSet) {
1942 fireConnectCloseEventIfNeeded(conn);
1943 this.close(this.voidPromise());
1944 return true;
1945 }
1946 }
1947 return false;
1948 }
1949
1950 private void fireDatagramExtensionEvent(QuicheQuicConnection conn) {
1951 if (conn.isClosed()) {
1952 return;
1953 }
1954 long connAddr = conn.address();
1955 int len = Quiche.quiche_conn_dgram_max_writable_len(connAddr);
1956
1957 if (len != Quiche.QUICHE_ERR_DONE) {
1958 pipeline().fireUserEventTriggered(new QuicDatagramExtensionEvent(len));
1959 }
1960 }
1961
1962 private QuicheQuicStreamChannel addNewStreamChannel(long streamId) {
1963 QuicheQuicStreamChannel streamChannel = new QuicheQuicStreamChannel(
1964 QuicheQuicChannel.this, streamId);
1965 QuicheQuicStreamChannel old = streams.put(streamId, streamChannel);
1966 assert old == null;
1967 streamChannel.writable(streamCapacity(streamId));
1968 return streamChannel;
1969 }
1970 }
1971
1972
1973
1974
1975 void finishConnect() {
1976 assert !server;
1977 assert connection != null;
1978 if (connectionSend(connection) != SendResult.NONE) {
1979 flushParent();
1980 }
1981 }
1982
1983 private void notifyEarlyDataReadyIfNeeded(QuicheQuicConnection conn) {
1984 if (!server && !earlyDataReadyNotified &&
1985 !conn.isFreed() && Quiche.quiche_conn_is_in_early_data(conn.address())) {
1986 earlyDataReadyNotified = true;
1987 pipeline().fireUserEventTriggered(SslEarlyDataReadyEvent.INSTANCE);
1988 }
1989 }
1990
1991 private final class TimeoutHandler implements Runnable {
1992 private ScheduledFuture<?> timeoutFuture;
1993
1994 @Override
1995 public void run() {
1996 QuicheQuicConnection conn = connection;
1997 if (conn.isFreed()) {
1998 return;
1999 }
2000 if (!freeIfClosed()) {
2001 long connAddr = conn.address();
2002 timeoutFuture = null;
2003
2004 Quiche.quiche_conn_on_timeout(connAddr);
2005 if (!freeIfClosed()) {
2006
2007
2008 if (connectionSend(conn) != SendResult.NONE) {
2009 flushParent();
2010 }
2011 boolean closed = freeIfClosed();
2012 if (!closed) {
2013
2014 scheduleTimeout();
2015 }
2016 }
2017 }
2018 }
2019
2020
2021
2022 void scheduleTimeout() {
2023 QuicheQuicConnection conn = connection;
2024 if (conn.isFreed()) {
2025 cancel();
2026 return;
2027 }
2028 if (conn.isClosed()) {
2029 cancel();
2030 unsafe().close(newPromise());
2031 return;
2032 }
2033 long nanos = Quiche.quiche_conn_timeout_as_nanos(conn.address());
2034 if (nanos < 0 || nanos == Long.MAX_VALUE) {
2035
2036 cancel();
2037 return;
2038 }
2039 if (timeoutFuture == null) {
2040 timeoutFuture = eventLoop().schedule(this,
2041 nanos, TimeUnit.NANOSECONDS);
2042 } else {
2043 long remaining = timeoutFuture.getDelay(TimeUnit.NANOSECONDS);
2044 if (remaining <= 0) {
2045
2046
2047 cancel();
2048 run();
2049 } else if (remaining > nanos) {
2050
2051
2052 cancel();
2053 timeoutFuture = eventLoop().schedule(this, nanos, TimeUnit.NANOSECONDS);
2054 }
2055 }
2056 }
2057
2058 void cancel() {
2059 if (timeoutFuture != null) {
2060 timeoutFuture.cancel(false);
2061 timeoutFuture = null;
2062 }
2063 }
2064 }
2065
2066 @Override
2067 public Future<QuicConnectionStats> collectStats(Promise<QuicConnectionStats> promise) {
2068 if (eventLoop().inEventLoop()) {
2069 collectStats0(promise);
2070 } else {
2071 eventLoop().execute(() -> collectStats0(promise));
2072 }
2073 return promise;
2074 }
2075
2076 private void collectStats0(Promise<QuicConnectionStats> promise) {
2077 QuicheQuicConnection conn = connection;
2078 if (conn.isFreed()) {
2079 promise.setSuccess(statsAtClose);
2080 return;
2081 }
2082
2083 collectStats0(connection, promise);
2084 }
2085
2086 @Nullable
2087 private QuicConnectionStats collectStats0(QuicheQuicConnection connection, Promise<QuicConnectionStats> promise) {
2088 final long[] stats = Quiche.quiche_conn_stats(connection.address());
2089 if (stats == null) {
2090 promise.setFailure(new IllegalStateException("native quiche_conn_stats(...) failed"));
2091 return null;
2092 }
2093
2094 final QuicheQuicConnectionStats connStats =
2095 new QuicheQuicConnectionStats(stats);
2096 promise.setSuccess(connStats);
2097 return connStats;
2098 }
2099
2100 @Override
2101 public Future<QuicConnectionPathStats> collectPathStats(int pathIdx, Promise<QuicConnectionPathStats> promise) {
2102 if (eventLoop().inEventLoop()) {
2103 collectPathStats0(pathIdx, promise);
2104 } else {
2105 eventLoop().execute(() -> collectPathStats0(pathIdx, promise));
2106 }
2107 return promise;
2108 }
2109
2110 private void collectPathStats0(int pathIdx, Promise<QuicConnectionPathStats> promise) {
2111 QuicheQuicConnection conn = connection;
2112 if (conn.isFreed()) {
2113 promise.setFailure(new IllegalStateException("Connection is closed"));
2114 return;
2115 }
2116
2117 final Object[] stats = Quiche.quiche_conn_path_stats(connection.address(), pathIdx);
2118 if (stats == null) {
2119 promise.setFailure(new IllegalStateException("native quiche_conn_path_stats(...) failed"));
2120 return;
2121 }
2122 promise.setSuccess(new QuicheQuicConnectionPathStats(stats));
2123 }
2124
2125 @Override
2126 public QuicTransportParameters peerTransportParameters() {
2127 return connection.peerParameters();
2128 }
2129 }