1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.quic;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.AbstractChannel;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandler;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.ChannelMetadata;
27 import io.netty.channel.ChannelOption;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPipeline;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.DefaultChannelPipeline;
33 import io.netty.channel.EventLoop;
34 import io.netty.channel.RecvByteBufAllocator;
35 import io.netty.channel.socket.DatagramPacket;
36 import io.netty.handler.ssl.SniCompletionEvent;
37 import io.netty.handler.ssl.SslHandshakeCompletionEvent;
38 import io.netty.util.AttributeKey;
39 import io.netty.util.collection.LongObjectHashMap;
40 import io.netty.util.collection.LongObjectMap;
41 import io.netty.util.concurrent.Future;
42 import io.netty.util.concurrent.ImmediateEventExecutor;
43 import io.netty.util.concurrent.ImmediateExecutor;
44 import io.netty.util.concurrent.Promise;
45 import io.netty.util.internal.StringUtil;
46 import io.netty.util.internal.logging.InternalLogger;
47 import io.netty.util.internal.logging.InternalLoggerFactory;
48 import org.jetbrains.annotations.Nullable;
49
50 import javax.net.ssl.SSLEngine;
51 import javax.net.ssl.SSLHandshakeException;
52 import java.io.File;
53 import java.net.ConnectException;
54 import java.net.InetSocketAddress;
55 import java.net.SocketAddress;
56 import java.nio.BufferUnderflowException;
57 import java.nio.ByteBuffer;
58 import java.nio.channels.AlreadyConnectedException;
59 import java.nio.channels.ClosedChannelException;
60 import java.nio.channels.ConnectionPendingException;
61 import java.util.ArrayList;
62 import java.util.Collections;
63 import java.util.HashSet;
64 import java.util.List;
65 import java.util.Map;
66 import java.util.Set;
67 import java.util.concurrent.Executor;
68 import java.util.concurrent.ScheduledFuture;
69 import java.util.concurrent.TimeUnit;
70 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
71 import java.util.function.Consumer;
72 import java.util.function.Function;
73
74
75
76
77 final class QuicheQuicChannel extends AbstractChannel implements QuicChannel {
78 private static final InternalLogger logger = InternalLoggerFactory.getInstance(QuicheQuicChannel.class);
79 private static final String QLOG_FILE_EXTENSION = ".qlog";
80
81 enum StreamRecvResult {
82
83
84
85 DONE,
86
87
88
89 FIN,
90
91
92
93 OK
94 }
95
96 private enum ChannelState {
97 OPEN,
98 ACTIVE,
99 CLOSED
100 }
101
102 private enum SendResult {
103 SOME,
104 NONE,
105 CLOSE
106 }
107
108 private static final class CloseData implements ChannelFutureListener {
109 final boolean applicationClose;
110 final int err;
111 final ByteBuf reason;
112
113 CloseData(boolean applicationClose, int err, ByteBuf reason) {
114 this.applicationClose = applicationClose;
115 this.err = err;
116 this.reason = reason;
117 }
118
119 @Override
120 public void operationComplete(ChannelFuture future) {
121 reason.release();
122 }
123 }
124
125 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
126 private long[] readableStreams = new long[4];
127 private long[] writableStreams = new long[4];
128 private final LongObjectMap<QuicheQuicStreamChannel> streams = new LongObjectHashMap<>();
129 private final QuicheQuicChannelConfig config;
130 private final boolean server;
131 private final QuicStreamIdGenerator idGenerator;
132 private final ChannelHandler streamHandler;
133 private final Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray;
134 private final Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray;
135 private final TimeoutHandler timeoutHandler;
136 private final QuicConnectionIdGenerator connectionIdAddressGenerator;
137 private final QuicResetTokenGenerator resetTokenGenerator;
138 private final Set<ByteBuffer> sourceConnectionIds = new HashSet<>();
139
140 private Consumer<QuicheQuicChannel> freeTask;
141 private Executor sslTaskExecutor;
142 private boolean inFireChannelReadCompleteQueue;
143 private boolean fireChannelReadCompletePending;
144 private ByteBuf finBuffer;
145 private ByteBuf outErrorCodeBuffer;
146 private ChannelPromise connectPromise;
147 private ScheduledFuture<?> connectTimeoutFuture;
148 private QuicConnectionAddress connectLocalAddress;
149 private QuicConnectionAddress connectRemoteAddress;
150 private CloseData closeData;
151 private QuicConnectionCloseEvent connectionCloseEvent;
152 private QuicConnectionStats statsAtClose;
153 private boolean supportsDatagram;
154 private boolean recvDatagramPending;
155 private boolean datagramReadable;
156 private boolean recvStreamPending;
157 private boolean streamReadable;
158 private boolean handshakeCompletionNotified;
159 private boolean earlyDataReadyNotified;
160 private int reantranceGuard;
161 private static final int IN_RECV = 1 << 1;
162 private static final int IN_CONNECTION_SEND = 1 << 2;
163 private static final int IN_HANDLE_WRITABLE_STREAMS = 1 << 3;
164 private volatile ChannelState state = ChannelState.OPEN;
165 private volatile boolean timedOut;
166 private volatile String traceId;
167 private volatile QuicheQuicConnection connection;
168 private volatile InetSocketAddress local;
169 private volatile InetSocketAddress remote;
170
171 private final ChannelFutureListener continueSendingListener = f -> {
172 if (connectionSend(connection) != SendResult.NONE) {
173 flushParent();
174 }
175 };
176
177 private static final AtomicLongFieldUpdater<QuicheQuicChannel> UNI_STREAMS_LEFT_UPDATER =
178 AtomicLongFieldUpdater.newUpdater(QuicheQuicChannel.class, "uniStreamsLeft");
179 private volatile long uniStreamsLeft;
180
181 private static final AtomicLongFieldUpdater<QuicheQuicChannel> BIDI_STREAMS_LEFT_UPDATER =
182 AtomicLongFieldUpdater.newUpdater(QuicheQuicChannel.class, "bidiStreamsLeft");
183 private volatile long bidiStreamsLeft;
184
185 private QuicheQuicChannel(Channel parent, boolean server, @Nullable ByteBuffer key, InetSocketAddress local,
186 InetSocketAddress remote, boolean supportsDatagram, ChannelHandler streamHandler,
187 Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
188 Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray,
189 @Nullable Consumer<QuicheQuicChannel> freeTask,
190 @Nullable Executor sslTaskExecutor,
191 @Nullable QuicConnectionIdGenerator connectionIdAddressGenerator,
192 @Nullable QuicResetTokenGenerator resetTokenGenerator) {
193 super(parent);
194 config = new QuicheQuicChannelConfig(this);
195 this.freeTask = freeTask;
196 this.server = server;
197 this.idGenerator = new QuicStreamIdGenerator(server);
198 this.connectionIdAddressGenerator = connectionIdAddressGenerator;
199 this.resetTokenGenerator = resetTokenGenerator;
200 if (key != null) {
201 this.sourceConnectionIds.add(key);
202 }
203
204 this.supportsDatagram = supportsDatagram;
205 this.local = local;
206 this.remote = remote;
207
208 this.streamHandler = streamHandler;
209 this.streamOptionsArray = streamOptionsArray;
210 this.streamAttrsArray = streamAttrsArray;
211 timeoutHandler = new TimeoutHandler();
212 this.sslTaskExecutor = sslTaskExecutor == null ? ImmediateExecutor.INSTANCE : sslTaskExecutor;
213 }
214
215 static QuicheQuicChannel forClient(Channel parent, InetSocketAddress local, InetSocketAddress remote,
216 ChannelHandler streamHandler,
217 Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
218 Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray) {
219 return new QuicheQuicChannel(parent, false, null, local, remote, false, streamHandler,
220 streamOptionsArray, streamAttrsArray, null, null, null, null);
221 }
222
223 static QuicheQuicChannel forServer(Channel parent, ByteBuffer key, InetSocketAddress local,
224 InetSocketAddress remote,
225 boolean supportsDatagram, ChannelHandler streamHandler,
226 Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
227 Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray,
228 Consumer<QuicheQuicChannel> freeTask, Executor sslTaskExecutor,
229 QuicConnectionIdGenerator connectionIdAddressGenerator,
230 QuicResetTokenGenerator resetTokenGenerator) {
231 return new QuicheQuicChannel(parent, true, key, local, remote, supportsDatagram,
232 streamHandler, streamOptionsArray, streamAttrsArray, freeTask,
233 sslTaskExecutor, connectionIdAddressGenerator, resetTokenGenerator);
234 }
235
236 private static final int MAX_ARRAY_LEN = 128;
237
238 private static long[] growIfNeeded(long[] array, int maxLength) {
239 if (maxLength > array.length) {
240 if (array.length == MAX_ARRAY_LEN) {
241 return array;
242 }
243
244 return new long[Math.min(MAX_ARRAY_LEN, array.length + 4)];
245 }
246 return array;
247 }
248
249 @Override
250 public boolean isTimedOut() {
251 return timedOut;
252 }
253
254 @Override
255 public SSLEngine sslEngine() {
256 QuicheQuicConnection connection = this.connection;
257 return connection == null ? null : connection.engine();
258 }
259
260 private void notifyAboutHandshakeCompletionIfNeeded(QuicheQuicConnection conn,
261 @Nullable SSLHandshakeException cause) {
262 if (handshakeCompletionNotified) {
263 return;
264 }
265 if (cause != null) {
266 pipeline().fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
267 return;
268 }
269 if (conn.isFreed()) {
270 return;
271 }
272 switch (connection.engine().getHandshakeStatus()) {
273 case NOT_HANDSHAKING:
274 case FINISHED:
275 handshakeCompletionNotified = true;
276 pipeline().fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
277 break;
278 default:
279 break;
280 }
281 }
282
283 @Override
284 public long peerAllowedStreams(QuicStreamType type) {
285 switch (type) {
286 case BIDIRECTIONAL:
287 return bidiStreamsLeft;
288 case UNIDIRECTIONAL:
289 return uniStreamsLeft;
290 default:
291 return 0;
292 }
293 }
294
295 void attachQuicheConnection(QuicheQuicConnection connection) {
296 this.connection = connection;
297
298 byte[] traceId = Quiche.quiche_conn_trace_id(connection.address());
299 if (traceId != null) {
300 this.traceId = new String(traceId);
301 }
302
303 connection.init(local, remote,
304 sniHostname -> pipeline().fireUserEventTriggered(new SniCompletionEvent(sniHostname)));
305
306
307 QLogConfiguration configuration = config.getQLogConfiguration();
308 if (configuration != null) {
309 final String fileName;
310 File file = new File(configuration.path());
311 if (file.isDirectory()) {
312
313 file.mkdir();
314 if (this.traceId != null) {
315 fileName = configuration.path() + File.separatorChar + this.traceId + "-" +
316 id().asShortText() + QLOG_FILE_EXTENSION;
317 } else {
318 fileName = configuration.path() + File.separatorChar + id().asShortText() + QLOG_FILE_EXTENSION;
319 }
320 } else {
321 fileName = configuration.path();
322 }
323
324 if (!Quiche.quiche_conn_set_qlog_path(connection.address(), fileName,
325 configuration.logTitle(), configuration.logDescription())) {
326 logger.info("Unable to create qlog file: {} ", fileName);
327 }
328 }
329 }
330
331 void connectNow(Function<QuicChannel, ? extends QuicSslEngine> engineProvider, Executor sslTaskExecutor,
332 Consumer<QuicheQuicChannel> freeTask, long configAddr, int localConnIdLength,
333 boolean supportsDatagram, ByteBuffer fromSockaddrMemory, ByteBuffer toSockaddrMemory)
334 throws Exception {
335 assert this.connection == null;
336 assert this.traceId == null;
337 assert this.sourceConnectionIds.isEmpty();
338
339 this.sslTaskExecutor = sslTaskExecutor;
340 this.freeTask = freeTask;
341
342 QuicConnectionAddress connectLocalAddress = this.connectLocalAddress;
343
344 if (connectLocalAddress == QuicConnectionAddress.EPHEMERAL) {
345 connectLocalAddress = QuicConnectionAddress.random(localConnIdLength);
346 }
347 ByteBuffer localConnectId = connectLocalAddress.id();
348 if (localConnectId.remaining() != localConnIdLength) {
349 failConnectPromiseAndThrow(new IllegalArgumentException("connectionAddress has length "
350 + localConnectId.remaining()
351 + " instead of " + localConnIdLength));
352 }
353 QuicConnectionAddress connectRemoteAddress = this.connectRemoteAddress;
354
355 QuicSslEngine engine = engineProvider.apply(this);
356 if (!(engine instanceof QuicheQuicSslEngine)) {
357 failConnectPromiseAndThrow(new IllegalArgumentException("QuicSslEngine is not of type "
358 + QuicheQuicSslEngine.class.getSimpleName()));
359 return;
360 }
361 if (!engine.getUseClientMode()) {
362 failConnectPromiseAndThrow(new IllegalArgumentException("QuicSslEngine is not create in client mode"));
363 }
364 QuicheQuicSslEngine quicheEngine = (QuicheQuicSslEngine) engine;
365 ByteBuf localIdBuffer = alloc().directBuffer(localConnectId.remaining()).writeBytes(localConnectId.duplicate());
366 ByteBuf remoteIdBuffer;
367 if (connectRemoteAddress != QuicConnectionAddress.EPHEMERAL) {
368 ByteBuffer remoteId = connectRemoteAddress.id();
369 remoteIdBuffer = alloc().directBuffer(remoteId.remaining()).writeBytes(remoteId.duplicate());
370 } else {
371 remoteIdBuffer = null;
372 }
373 try {
374 int fromSockaddrLen = SockaddrIn.setAddress(fromSockaddrMemory, local);
375 int toSockaddrLen = SockaddrIn.setAddress(toSockaddrMemory, remote);
376 QuicheQuicConnection connection = quicheEngine.createConnection(ssl -> {
377 long localIdMemoryAddress = Quiche.readerMemoryAddress(localIdBuffer);
378 int localIdLength = localIdBuffer.readableBytes();
379
380 final long remoteIdMemoryAddress;
381 final int remoteIdLength;
382 if (remoteIdBuffer == null) {
383 return Quiche.quiche_conn_new_with_tls(localIdMemoryAddress, localIdLength,
384 -1, -1,
385 Quiche.memoryAddressWithPosition(fromSockaddrMemory), fromSockaddrLen,
386 Quiche.memoryAddressWithPosition(toSockaddrMemory), toSockaddrLen,
387 configAddr, ssl, false);
388 } else {
389 return Quiche.quiche_conn_new_with_tls_and_client_dcid(localIdMemoryAddress, localIdLength,
390 Quiche.readerMemoryAddress(remoteIdBuffer), remoteIdBuffer.readableBytes(),
391 Quiche.memoryAddressWithPosition(fromSockaddrMemory), fromSockaddrLen,
392 Quiche.memoryAddressWithPosition(toSockaddrMemory), toSockaddrLen,
393 configAddr, ssl);
394 }
395 });
396 if (connection == null) {
397 failConnectPromiseAndThrow(new ConnectException());
398 return;
399 }
400 attachQuicheConnection(connection);
401 QuicClientSessionCache sessionCache = quicheEngine.ctx.getSessionCache();
402 if (sessionCache != null) {
403 byte[] sessionBytes = sessionCache
404 .getSession(quicheEngine.getSession().getPeerHost(), quicheEngine.getSession().getPeerPort());
405 if (sessionBytes != null) {
406 Quiche.quiche_conn_set_session(connection.address(), sessionBytes);
407 }
408 }
409 this.supportsDatagram = supportsDatagram;
410 sourceConnectionIds.add(localConnectId);
411 } finally {
412 localIdBuffer.release();
413 if (remoteIdBuffer != null) {
414 remoteIdBuffer.release();
415 }
416 }
417 }
418
419 private void failConnectPromiseAndThrow(Exception e) throws Exception {
420 tryFailConnectPromise(e);
421 throw e;
422 }
423
424 private boolean tryFailConnectPromise(Exception e) {
425 ChannelPromise promise = connectPromise;
426 if (promise != null) {
427 connectPromise = null;
428 promise.tryFailure(e);
429 return true;
430 }
431 return false;
432 }
433
434 Set<ByteBuffer> sourceConnectionIds() {
435 return sourceConnectionIds;
436 }
437
438 boolean markInFireChannelReadCompleteQueue() {
439 if (inFireChannelReadCompleteQueue) {
440 return false;
441 }
442 inFireChannelReadCompleteQueue = true;
443 return true;
444 }
445
446 private void failPendingConnectPromise() {
447 ChannelPromise promise = QuicheQuicChannel.this.connectPromise;
448 if (promise != null) {
449 QuicheQuicChannel.this.connectPromise = null;
450 promise.tryFailure(new QuicClosedChannelException(this.connectionCloseEvent));
451 }
452 }
453
454 void forceClose() {
455 unsafe().close(voidPromise());
456 }
457
458 @Override
459 protected DefaultChannelPipeline newChannelPipeline() {
460 return new DefaultChannelPipeline(this) {
461 @Override
462 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
463 if (msg instanceof QuicStreamChannel) {
464 QuicStreamChannel channel = (QuicStreamChannel) msg;
465 Quic.setupChannel(channel, streamOptionsArray, streamAttrsArray, streamHandler, logger);
466 ctx.channel().eventLoop().register(channel);
467 } else {
468 super.onUnhandledInboundMessage(ctx, msg);
469 }
470 }
471 };
472 }
473
474 @Override
475 public QuicChannel flush() {
476 super.flush();
477 return this;
478 }
479
480 @Override
481 public QuicChannel read() {
482 super.read();
483 return this;
484 }
485
486 @Override
487 public Future<QuicStreamChannel> createStream(QuicStreamType type, @Nullable ChannelHandler handler,
488 Promise<QuicStreamChannel> promise) {
489 if (eventLoop().inEventLoop()) {
490 ((QuicChannelUnsafe) unsafe()).connectStream(type, handler, promise);
491 } else {
492 eventLoop().execute(() -> ((QuicChannelUnsafe) unsafe()).connectStream(type, handler, promise));
493 }
494 return promise;
495 }
496
497 @Override
498 public ChannelFuture close(boolean applicationClose, int error, ByteBuf reason, ChannelPromise promise) {
499 if (eventLoop().inEventLoop()) {
500 close0(applicationClose, error, reason, promise);
501 } else {
502 eventLoop().execute(() -> close0(applicationClose, error, reason, promise));
503 }
504 return promise;
505 }
506
507 private void close0(boolean applicationClose, int error, ByteBuf reason, ChannelPromise promise) {
508 if (closeData == null) {
509 if (!reason.hasMemoryAddress()) {
510
511 ByteBuf copy = alloc().directBuffer(reason.readableBytes()).writeBytes(reason);
512 reason.release();
513 reason = copy;
514 }
515 closeData = new CloseData(applicationClose, error, reason);
516 promise.addListener(closeData);
517 } else {
518
519 reason.release();
520 }
521 close(promise);
522 }
523
524 @Override
525 public String toString() {
526 String traceId = this.traceId;
527 if (traceId == null) {
528 return "()" + super.toString();
529 } else {
530 return '(' + traceId + ')' + super.toString();
531 }
532 }
533
534 @Override
535 protected AbstractUnsafe newUnsafe() {
536 return new QuicChannelUnsafe();
537 }
538
539 @Override
540 protected boolean isCompatible(EventLoop eventLoop) {
541 return parent().eventLoop() == eventLoop;
542 }
543
544 @Override
545 @Nullable
546 protected QuicConnectionAddress localAddress0() {
547 QuicheQuicConnection connection = this.connection;
548 return connection == null ? null : connection.sourceId();
549 }
550
551 @Override
552 @Nullable
553 protected QuicConnectionAddress remoteAddress0() {
554 QuicheQuicConnection connection = this.connection;
555 return connection == null ? null : connection.destinationId();
556 }
557
558 @Override
559 @Nullable
560 public QuicConnectionAddress localAddress() {
561
562 return localAddress0();
563 }
564
565 @Override
566 @Nullable
567 public QuicConnectionAddress remoteAddress() {
568
569 return remoteAddress0();
570 }
571
572 @Override
573 @Nullable
574 public SocketAddress localSocketAddress() {
575 return local;
576 }
577
578 @Override
579 @Nullable
580 public SocketAddress remoteSocketAddress() {
581 return remote;
582 }
583
584 @Override
585 protected void doBind(SocketAddress socketAddress) {
586 throw new UnsupportedOperationException();
587 }
588
589 @Override
590 protected void doDisconnect() throws Exception {
591 doClose();
592 }
593
594 @Override
595 protected void doClose() throws Exception {
596 if (state == ChannelState.CLOSED) {
597 return;
598 }
599 state = ChannelState.CLOSED;
600
601 QuicheQuicConnection conn = this.connection;
602 if (conn == null || conn.isFreed()) {
603 if (closeData != null) {
604 closeData.reason.release();
605 closeData = null;
606 }
607 failPendingConnectPromise();
608 return;
609 }
610
611
612 SendResult sendResult = connectionSend(conn);
613
614 final boolean app;
615 final int err;
616 final ByteBuf reason;
617 if (closeData == null) {
618 app = false;
619 err = 0;
620 reason = Unpooled.EMPTY_BUFFER;
621 } else {
622 app = closeData.applicationClose;
623 err = closeData.err;
624 reason = closeData.reason;
625 closeData = null;
626 }
627
628 failPendingConnectPromise();
629 try {
630 int res = Quiche.quiche_conn_close(conn.address(), app, err,
631 Quiche.readerMemoryAddress(reason), reason.readableBytes());
632 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
633 throw Quiche.convertToException(res);
634 }
635
636
637
638
639 if (connectionSend(conn) == SendResult.SOME) {
640 sendResult = SendResult.SOME;
641 }
642 } finally {
643
644
645
646 statsAtClose = collectStats0(conn, eventLoop().newPromise());
647 try {
648 timedOut = Quiche.quiche_conn_is_timed_out(conn.address());
649
650 closeStreams();
651 if (finBuffer != null) {
652 finBuffer.release();
653 finBuffer = null;
654 }
655 if (outErrorCodeBuffer != null) {
656 outErrorCodeBuffer.release();
657 outErrorCodeBuffer = null;
658 }
659 } finally {
660 if (sendResult == SendResult.SOME) {
661
662 forceFlushParent();
663 } else {
664 flushParent();
665 }
666 conn.free();
667 if (freeTask != null) {
668 freeTask.accept(this);
669 }
670 timeoutHandler.cancel();
671
672 local = null;
673 remote = null;
674 }
675 }
676 }
677
678 @Override
679 protected void doBeginRead() {
680 recvDatagramPending = true;
681 recvStreamPending = true;
682 if (datagramReadable || streamReadable) {
683 ((QuicChannelUnsafe) unsafe()).recv();
684 }
685 }
686
687 @Override
688 protected Object filterOutboundMessage(Object msg) {
689 if (msg instanceof ByteBuf) {
690 return msg;
691 }
692 throw new UnsupportedOperationException("Unsupported message type: " + StringUtil.simpleClassName(msg));
693 }
694
695 @Override
696 protected void doWrite(ChannelOutboundBuffer channelOutboundBuffer) throws Exception {
697 if (!supportsDatagram) {
698 throw new UnsupportedOperationException("Datagram extension is not supported");
699 }
700 boolean sendSomething = false;
701 boolean retry = false;
702 QuicheQuicConnection conn = connection;
703 try {
704 for (;;) {
705 ByteBuf buffer = (ByteBuf) channelOutboundBuffer.current();
706 if (buffer == null) {
707 break;
708 }
709
710 int readable = buffer.readableBytes();
711 if (readable == 0) {
712
713 channelOutboundBuffer.remove();
714 continue;
715 }
716
717 final int res;
718 if (!buffer.isDirect() || buffer.nioBufferCount() > 1) {
719 ByteBuf tmpBuffer = alloc().directBuffer(readable);
720 try {
721 tmpBuffer.writeBytes(buffer, buffer.readerIndex(), readable);
722 res = sendDatagram(conn, tmpBuffer);
723 } finally {
724 tmpBuffer.release();
725 }
726 } else {
727 res = sendDatagram(conn, buffer);
728 }
729 if (res >= 0) {
730 channelOutboundBuffer.remove();
731 sendSomething = true;
732 retry = false;
733 } else {
734 if (res == Quiche.QUICHE_ERR_BUFFER_TOO_SHORT) {
735 retry = false;
736 channelOutboundBuffer.remove(new BufferUnderflowException());
737 } else if (res == Quiche.QUICHE_ERR_INVALID_STATE) {
738 throw new UnsupportedOperationException("Remote peer does not support Datagram extension");
739 } else if (res == Quiche.QUICHE_ERR_DONE) {
740 if (retry) {
741
742 for (;;) {
743 if (!channelOutboundBuffer.remove()) {
744
745 return;
746 }
747 }
748 }
749
750 sendSomething = false;
751
752
753 if (connectionSend(conn) != SendResult.NONE) {
754 forceFlushParent();
755 }
756
757 retry = true;
758 } else {
759 throw Quiche.convertToException(res);
760 }
761 }
762 }
763 } finally {
764 if (sendSomething && connectionSend(conn) != SendResult.NONE) {
765 flushParent();
766 }
767 }
768 }
769
770 private static int sendDatagram(QuicheQuicConnection conn, ByteBuf buf) throws ClosedChannelException {
771 return Quiche.quiche_conn_dgram_send(connectionAddressChecked(conn),
772 Quiche.readerMemoryAddress(buf), buf.readableBytes());
773 }
774
775 @Override
776 public QuicChannelConfig config() {
777 return config;
778 }
779
780 @Override
781 public boolean isOpen() {
782 return state != ChannelState.CLOSED;
783 }
784
785 @Override
786 public boolean isActive() {
787 return state == ChannelState.ACTIVE;
788 }
789
790 @Override
791 public ChannelMetadata metadata() {
792 return METADATA;
793 }
794
795
796
797
798
799 private void flushParent() {
800 if (!inFireChannelReadCompleteQueue) {
801 forceFlushParent();
802 }
803 }
804
805
806
807
808 private void forceFlushParent() {
809 parent().flush();
810 }
811
812 private static long connectionAddressChecked(@Nullable QuicheQuicConnection conn) throws ClosedChannelException {
813 if (conn == null || conn.isFreed()) {
814 throw new ClosedChannelException();
815 }
816 return conn.address();
817 }
818
819 boolean freeIfClosed() {
820 QuicheQuicConnection conn = connection;
821 if (conn == null || conn.isFreed()) {
822 return true;
823 }
824 if (conn.isClosed()) {
825 unsafe().close(newPromise());
826 return true;
827 }
828 return false;
829 }
830
831 private void closeStreams() {
832 if (streams.isEmpty()) {
833 return;
834 }
835 final ClosedChannelException closedChannelException;
836 if (isTimedOut()) {
837
838 closedChannelException = new QuicTimeoutClosedChannelException();
839 } else {
840 closedChannelException = new ClosedChannelException();
841 }
842
843
844 for (QuicheQuicStreamChannel stream: streams.values().toArray(new QuicheQuicStreamChannel[0])) {
845 stream.unsafe().close(closedChannelException, voidPromise());
846 }
847 streams.clear();
848 }
849
850 void streamPriority(long streamId, byte priority, boolean incremental) throws Exception {
851 int res = Quiche.quiche_conn_stream_priority(connectionAddressChecked(connection), streamId,
852 priority, incremental);
853 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
854 throw Quiche.convertToException(res);
855 }
856 }
857
858 void streamClosed(long streamId) {
859 streams.remove(streamId);
860 }
861
862 boolean isStreamLocalCreated(long streamId) {
863 return (streamId & 0x1) == (server ? 1 : 0);
864 }
865
866 QuicStreamType streamType(long streamId) {
867 return (streamId & 0x2) == 0 ? QuicStreamType.BIDIRECTIONAL : QuicStreamType.UNIDIRECTIONAL;
868 }
869
870 void streamShutdown(long streamId, boolean read, boolean write, int err, ChannelPromise promise) {
871 QuicheQuicConnection conn = this.connection;
872 final long connectionAddress;
873 try {
874 connectionAddress = connectionAddressChecked(conn);
875 } catch (ClosedChannelException e) {
876 promise.setFailure(e);
877 return;
878 }
879 int res = 0;
880 if (read) {
881 res |= Quiche.quiche_conn_stream_shutdown(connectionAddress, streamId, Quiche.QUICHE_SHUTDOWN_READ, err);
882 }
883 if (write) {
884 res |= Quiche.quiche_conn_stream_shutdown(connectionAddress, streamId, Quiche.QUICHE_SHUTDOWN_WRITE, err);
885 }
886
887
888
889
890
891 if (connectionSend(conn) != SendResult.NONE) {
892
893 forceFlushParent();
894 }
895 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
896 promise.setFailure(Quiche.convertToException(res));
897 } else {
898 promise.setSuccess();
899 }
900 }
901
902 void streamSendFin(long streamId) throws Exception {
903 QuicheQuicConnection conn = connection;
904 try {
905
906 int res = streamSend0(conn, streamId, Unpooled.EMPTY_BUFFER, true);
907 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
908 throw Quiche.convertToException(res);
909 }
910 } finally {
911
912
913
914
915 if (connectionSend(conn) != SendResult.NONE) {
916 flushParent();
917 }
918 }
919 }
920
921 int streamSend(long streamId, ByteBuf buffer, boolean fin) throws ClosedChannelException {
922 QuicheQuicConnection conn = connection;
923 if (buffer.nioBufferCount() == 1) {
924 return streamSend0(conn, streamId, buffer, fin);
925 }
926 ByteBuffer[] nioBuffers = buffer.nioBuffers();
927 int lastIdx = nioBuffers.length - 1;
928 int res = 0;
929 for (int i = 0; i < lastIdx; i++) {
930 ByteBuffer nioBuffer = nioBuffers[i];
931 while (nioBuffer.hasRemaining()) {
932 int localRes = streamSend(conn, streamId, nioBuffer, false);
933 if (localRes <= 0) {
934 return res;
935 }
936 res += localRes;
937
938 nioBuffer.position(nioBuffer.position() + localRes);
939 }
940 }
941 int localRes = streamSend(conn, streamId, nioBuffers[lastIdx], fin);
942 if (localRes > 0) {
943 res += localRes;
944 }
945 return res;
946 }
947
948 void connectionSendAndFlush() {
949 if (inFireChannelReadCompleteQueue || (reantranceGuard & IN_HANDLE_WRITABLE_STREAMS) != 0) {
950 return;
951 }
952 if (connectionSend(connection) != SendResult.NONE) {
953 flushParent();
954 }
955 }
956
957 private int streamSend0(QuicheQuicConnection conn, long streamId, ByteBuf buffer, boolean fin)
958 throws ClosedChannelException {
959 return Quiche.quiche_conn_stream_send(connectionAddressChecked(conn), streamId,
960 Quiche.readerMemoryAddress(buffer), buffer.readableBytes(), fin);
961 }
962
963 private int streamSend(QuicheQuicConnection conn, long streamId, ByteBuffer buffer, boolean fin)
964 throws ClosedChannelException {
965 return Quiche.quiche_conn_stream_send(connectionAddressChecked(conn), streamId,
966 Quiche.memoryAddressWithPosition(buffer), buffer.remaining(), fin);
967 }
968
969 StreamRecvResult streamRecv(long streamId, ByteBuf buffer) throws Exception {
970 QuicheQuicConnection conn = connection;
971 long connAddr = connectionAddressChecked(conn);
972 if (finBuffer == null) {
973 finBuffer = alloc().directBuffer(1);
974 }
975 if (outErrorCodeBuffer == null) {
976 outErrorCodeBuffer = alloc().directBuffer(8);
977 }
978 outErrorCodeBuffer.setLongLE(0, -1L);
979 int writerIndex = buffer.writerIndex();
980 int recvLen = Quiche.quiche_conn_stream_recv(connAddr, streamId,
981 Quiche.writerMemoryAddress(buffer), buffer.writableBytes(), Quiche.writerMemoryAddress(finBuffer),
982 Quiche.writerMemoryAddress(outErrorCodeBuffer));
983 long errorCode = outErrorCodeBuffer.getLongLE(0);
984 if (recvLen == Quiche.QUICHE_ERR_DONE) {
985 return StreamRecvResult.DONE;
986 } else if (recvLen < 0) {
987 throw Quiche.convertToException(recvLen, errorCode);
988 }
989 buffer.writerIndex(writerIndex + recvLen);
990 return finBuffer.getBoolean(0) ? StreamRecvResult.FIN : StreamRecvResult.OK;
991 }
992
993
994
995
996 void recv(InetSocketAddress sender, InetSocketAddress recipient, ByteBuf buffer) {
997 ((QuicChannelUnsafe) unsafe()).connectionRecv(sender, recipient, buffer);
998 }
999
1000
1001
1002
1003
1004
1005 List<ByteBuffer> retiredSourceConnectionId() {
1006 QuicheQuicConnection connection = this.connection;
1007 if (connection == null || connection.isFreed()) {
1008 return Collections.emptyList();
1009 }
1010 long connAddr = connection.address();
1011 assert connAddr != -1;
1012 List<ByteBuffer> retiredSourceIds = null;
1013 for (;;) {
1014 byte[] retired = Quiche.quiche_conn_retired_scid_next(connAddr);
1015 if (retired == null) {
1016 break;
1017 }
1018 if (retiredSourceIds == null) {
1019 retiredSourceIds = new ArrayList<>();
1020 }
1021 ByteBuffer retiredId = ByteBuffer.wrap(retired);
1022 retiredSourceIds.add(retiredId);
1023 sourceConnectionIds.remove(retiredId);
1024 }
1025 if (retiredSourceIds == null) {
1026 return Collections.emptyList();
1027 }
1028 return retiredSourceIds;
1029 }
1030
1031 List<ByteBuffer> newSourceConnectionIds() {
1032 if (connectionIdAddressGenerator != null && resetTokenGenerator != null) {
1033 QuicheQuicConnection connection = this.connection;
1034 if (connection == null || connection.isFreed()) {
1035 return Collections.emptyList();
1036 }
1037 long connAddr = connection.address();
1038
1039
1040 int left = Quiche.quiche_conn_scids_left(connAddr);
1041 if (left > 0) {
1042 QuicConnectionAddress sourceAddr = connection.sourceId();
1043 if (sourceAddr == null) {
1044 return Collections.emptyList();
1045 }
1046 List<ByteBuffer> generatedIds = new ArrayList<>(left);
1047 boolean sendAndFlush = false;
1048 ByteBuffer key = sourceAddr.id();
1049 ByteBuf connIdBuffer = alloc().directBuffer(key.remaining());
1050
1051 byte[] resetTokenArray = new byte[Quic.RESET_TOKEN_LEN];
1052 try {
1053 do {
1054 ByteBuffer srcId = connectionIdAddressGenerator.newId(key.duplicate(), key.remaining())
1055 .asReadOnlyBuffer();
1056 connIdBuffer.clear();
1057 connIdBuffer.writeBytes(srcId.duplicate());
1058 ByteBuffer resetToken = resetTokenGenerator.newResetToken(srcId.duplicate());
1059 resetToken.get(resetTokenArray);
1060 long result = Quiche.quiche_conn_new_scid(
1061 connAddr, Quiche.memoryAddress(connIdBuffer, 0, connIdBuffer.readableBytes()),
1062 connIdBuffer.readableBytes(), resetTokenArray, false, -1);
1063 if (result < 0) {
1064 break;
1065 }
1066 sendAndFlush = true;
1067 generatedIds.add(srcId.duplicate());
1068 sourceConnectionIds.add(srcId);
1069 } while (--left > 0);
1070 } finally {
1071 connIdBuffer.release();
1072 }
1073
1074 if (sendAndFlush) {
1075 connectionSendAndFlush();
1076 }
1077 return generatedIds;
1078 }
1079 }
1080 return Collections.emptyList();
1081 }
1082
1083 void writable() {
1084 QuicheQuicConnection conn = connection;
1085 SendResult result = connectionSend(conn);
1086 handleWritableStreams(conn);
1087 if (connectionSend(conn) == SendResult.SOME) {
1088 result = SendResult.SOME;
1089 }
1090 if (result == SendResult.SOME) {
1091
1092 forceFlushParent();
1093 }
1094 freeIfClosed();
1095 }
1096
1097 long streamCapacity(long streamId) {
1098 QuicheQuicConnection conn = connection;
1099 if (conn.isClosed()) {
1100 return 0;
1101 }
1102 return Quiche.quiche_conn_stream_capacity(conn.address(), streamId);
1103 }
1104
1105 private boolean handleWritableStreams(QuicheQuicConnection conn) {
1106 if (conn.isFreed()) {
1107 return false;
1108 }
1109 reantranceGuard |= IN_HANDLE_WRITABLE_STREAMS;
1110 try {
1111 long connAddr = conn.address();
1112 boolean mayNeedWrite = false;
1113
1114 if (Quiche.quiche_conn_is_established(connAddr) ||
1115 Quiche.quiche_conn_is_in_early_data(connAddr)) {
1116 long writableIterator = Quiche.quiche_conn_writable(connAddr);
1117
1118 int totalWritable = 0;
1119 try {
1120
1121 for (;;) {
1122 int writable = Quiche.quiche_stream_iter_next(
1123 writableIterator, writableStreams);
1124 for (int i = 0; i < writable; i++) {
1125 long streamId = writableStreams[i];
1126 QuicheQuicStreamChannel streamChannel = streams.get(streamId);
1127 if (streamChannel != null) {
1128 long capacity = Quiche.quiche_conn_stream_capacity(connAddr, streamId);
1129 if (streamChannel.writable(capacity)) {
1130 mayNeedWrite = true;
1131 }
1132 }
1133 }
1134 if (writable > 0) {
1135 totalWritable += writable;
1136 }
1137 if (writable < writableStreams.length) {
1138
1139 break;
1140 }
1141 }
1142 } finally {
1143 Quiche.quiche_stream_iter_free(writableIterator);
1144 }
1145 writableStreams = growIfNeeded(writableStreams, totalWritable);
1146 }
1147 return mayNeedWrite;
1148 } finally {
1149 reantranceGuard &= ~IN_HANDLE_WRITABLE_STREAMS;
1150 }
1151 }
1152
1153
1154
1155
1156
1157
1158 void recvComplete() {
1159 try {
1160 QuicheQuicConnection conn = connection;
1161 if (conn.isFreed()) {
1162
1163 forceFlushParent();
1164 return;
1165 }
1166 fireChannelReadCompleteIfNeeded();
1167
1168
1169
1170 connectionSend(conn);
1171
1172
1173 forceFlushParent();
1174 freeIfClosed();
1175 } finally {
1176 inFireChannelReadCompleteQueue = false;
1177 }
1178 }
1179
1180 private void fireChannelReadCompleteIfNeeded() {
1181 if (fireChannelReadCompletePending) {
1182 fireChannelReadCompletePending = false;
1183 pipeline().fireChannelReadComplete();
1184 }
1185 }
1186
1187 private void fireExceptionEvents(QuicheQuicConnection conn, Throwable cause) {
1188 if (cause instanceof SSLHandshakeException) {
1189 notifyAboutHandshakeCompletionIfNeeded(conn, (SSLHandshakeException) cause);
1190 }
1191 pipeline().fireExceptionCaught(cause);
1192 }
1193
1194 private boolean runTasksDirectly() {
1195 return sslTaskExecutor == null || sslTaskExecutor == ImmediateExecutor.INSTANCE ||
1196 sslTaskExecutor == ImmediateEventExecutor.INSTANCE;
1197 }
1198
1199 private void runAllTaskSend(QuicheQuicConnection conn, Runnable task) {
1200 sslTaskExecutor.execute(decorateTaskSend(conn, task));
1201 }
1202
1203 private void runAll(QuicheQuicConnection conn, Runnable task) {
1204 do {
1205 task.run();
1206 } while ((task = conn.sslTask()) != null);
1207 }
1208
1209 private Runnable decorateTaskSend(QuicheQuicConnection conn, Runnable task) {
1210 return () -> {
1211 try {
1212 runAll(conn, task);
1213 } finally {
1214
1215 eventLoop().execute(() -> {
1216
1217 if (connectionSend(conn) != SendResult.NONE) {
1218 forceFlushParent();
1219 }
1220 freeIfClosed();
1221 });
1222 }
1223 };
1224 }
1225
1226 private SendResult connectionSendSegments(QuicheQuicConnection conn,
1227 SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator) {
1228 if (conn.isClosed()) {
1229 return SendResult.NONE;
1230 }
1231 List<ByteBuf> bufferList = new ArrayList<>(segmentedDatagramPacketAllocator.maxNumSegments());
1232 long connAddr = conn.address();
1233 int maxDatagramSize = Quiche.quiche_conn_max_send_udp_payload_size(connAddr);
1234 SendResult sendResult = SendResult.NONE;
1235 boolean close = false;
1236 try {
1237 for (;;) {
1238 int len = calculateSendBufferLength(connAddr, maxDatagramSize);
1239 ByteBuf out = alloc().directBuffer(len);
1240
1241 ByteBuffer sendInfo = conn.nextSendInfo();
1242 InetSocketAddress sendToAddress = this.remote;
1243
1244 int writerIndex = out.writerIndex();
1245 int written = Quiche.quiche_conn_send(
1246 connAddr, Quiche.writerMemoryAddress(out), out.writableBytes(),
1247 Quiche.memoryAddressWithPosition(sendInfo));
1248 if (written == 0) {
1249 out.release();
1250
1251 continue;
1252 }
1253 final boolean done;
1254 if (written < 0) {
1255 done = true;
1256 if (written != Quiche.QUICHE_ERR_DONE) {
1257 close = Quiche.shouldClose(written);
1258 Exception e = Quiche.convertToException(written);
1259 if (!tryFailConnectPromise(e)) {
1260
1261 fireExceptionEvents(conn, e);
1262 }
1263 }
1264 } else {
1265 done = false;
1266 }
1267 int size = bufferList.size();
1268 if (done) {
1269
1270 out.release();
1271
1272 switch (size) {
1273 case 0:
1274
1275 break;
1276 case 1:
1277
1278 parent().write(new DatagramPacket(bufferList.get(0), sendToAddress));
1279 sendResult = SendResult.SOME;
1280 break;
1281 default:
1282 int segmentSize = segmentSize(bufferList);
1283 ByteBuf compositeBuffer = Unpooled.wrappedBuffer(bufferList.toArray(new ByteBuf[0]));
1284
1285 parent().write(segmentedDatagramPacketAllocator.newPacket(
1286 compositeBuffer, segmentSize, sendToAddress));
1287 sendResult = SendResult.SOME;
1288 break;
1289 }
1290 bufferList.clear();
1291 if (close) {
1292 sendResult = SendResult.CLOSE;
1293 }
1294 return sendResult;
1295 }
1296 out.writerIndex(writerIndex + written);
1297
1298 int segmentSize = -1;
1299 if (conn.isSendInfoChanged()) {
1300
1301 remote = QuicheSendInfo.getToAddress(sendInfo);
1302 local = QuicheSendInfo.getFromAddress(sendInfo);
1303
1304 if (size > 0) {
1305
1306
1307 segmentSize = segmentSize(bufferList);
1308 }
1309 } else if (size > 0) {
1310 int lastReadable = segmentSize(bufferList);
1311
1312
1313 if (lastReadable != out.readableBytes() ||
1314 size == segmentedDatagramPacketAllocator.maxNumSegments()) {
1315 segmentSize = lastReadable;
1316 }
1317 }
1318
1319
1320 if (segmentSize != -1) {
1321 final boolean stop;
1322 if (size == 1) {
1323
1324 stop = writePacket(new DatagramPacket(
1325 bufferList.get(0), sendToAddress), maxDatagramSize, len);
1326 } else {
1327
1328 ByteBuf compositeBuffer = Unpooled.wrappedBuffer(bufferList.toArray(new ByteBuf[0]));
1329 stop = writePacket(segmentedDatagramPacketAllocator.newPacket(
1330 compositeBuffer, segmentSize, sendToAddress), maxDatagramSize, len);
1331 }
1332 bufferList.clear();
1333 sendResult = SendResult.SOME;
1334
1335 if (stop) {
1336
1337
1338
1339 if (out.isReadable()) {
1340 parent().write(new DatagramPacket(out, sendToAddress));
1341 } else {
1342 out.release();
1343 }
1344 if (close) {
1345 sendResult = SendResult.CLOSE;
1346 }
1347 return sendResult;
1348 }
1349 }
1350
1351
1352 out.touch(bufferList);
1353
1354 bufferList.add(out);
1355 }
1356 } finally {
1357
1358 }
1359 }
1360
1361 private static int segmentSize(List<ByteBuf> bufferList) {
1362 assert !bufferList.isEmpty();
1363 int size = bufferList.size();
1364 return bufferList.get(size - 1).readableBytes();
1365 }
1366
1367 private SendResult connectionSendSimple(QuicheQuicConnection conn) {
1368 if (conn.isClosed()) {
1369 return SendResult.NONE;
1370 }
1371 long connAddr = conn.address();
1372 SendResult sendResult = SendResult.NONE;
1373 boolean close = false;
1374 int maxDatagramSize = Quiche.quiche_conn_max_send_udp_payload_size(connAddr);
1375 for (;;) {
1376 ByteBuffer sendInfo = conn.nextSendInfo();
1377
1378 int len = calculateSendBufferLength(connAddr, maxDatagramSize);
1379 ByteBuf out = alloc().directBuffer(len);
1380 int writerIndex = out.writerIndex();
1381
1382 int written = Quiche.quiche_conn_send(
1383 connAddr, Quiche.writerMemoryAddress(out), out.writableBytes(),
1384 Quiche.memoryAddressWithPosition(sendInfo));
1385
1386 if (written == 0) {
1387
1388 out.release();
1389 continue;
1390 }
1391 if (written < 0) {
1392 out.release();
1393 if (written != Quiche.QUICHE_ERR_DONE) {
1394 close = Quiche.shouldClose(written);
1395
1396 Exception e = Quiche.convertToException(written);
1397 if (!tryFailConnectPromise(e)) {
1398 fireExceptionEvents(conn, e);
1399 }
1400 }
1401 break;
1402 }
1403 if (conn.isSendInfoChanged()) {
1404
1405 remote = QuicheSendInfo.getToAddress(sendInfo);
1406 local = QuicheSendInfo.getFromAddress(sendInfo);
1407 }
1408 out.writerIndex(writerIndex + written);
1409 boolean stop = writePacket(new DatagramPacket(out, remote), maxDatagramSize, len);
1410 sendResult = SendResult.SOME;
1411 if (stop) {
1412
1413 break;
1414 }
1415 }
1416 if (close) {
1417 sendResult = SendResult.CLOSE;
1418 }
1419 return sendResult;
1420 }
1421
1422 private boolean writePacket(DatagramPacket packet, int maxDatagramSize, int len) {
1423 ChannelFuture future = parent().write(packet);
1424 if (isSendWindowUsed(maxDatagramSize, len)) {
1425
1426 future.addListener(continueSendingListener);
1427 return true;
1428 }
1429 return false;
1430 }
1431
1432 private static boolean isSendWindowUsed(int maxDatagramSize, int len) {
1433 return len < maxDatagramSize;
1434 }
1435
1436 private static int calculateSendBufferLength(long connAddr, int maxDatagramSize) {
1437 int len = Math.min(maxDatagramSize, Quiche.quiche_conn_send_quantum(connAddr));
1438 if (len <= 0) {
1439
1440
1441
1442
1443 return 8;
1444 }
1445 return len;
1446 }
1447
1448
1449
1450
1451
1452 private SendResult connectionSend(QuicheQuicConnection conn) {
1453 if (conn.isFreed()) {
1454 return SendResult.NONE;
1455 }
1456 if ((reantranceGuard & IN_CONNECTION_SEND) != 0) {
1457
1458 notifyEarlyDataReadyIfNeeded(conn);
1459 return SendResult.NONE;
1460 }
1461
1462 reantranceGuard |= IN_CONNECTION_SEND;
1463 try {
1464 SendResult sendResult;
1465 SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator =
1466 config.getSegmentedDatagramPacketAllocator();
1467 if (segmentedDatagramPacketAllocator.maxNumSegments() > 0) {
1468 sendResult = connectionSendSegments(conn, segmentedDatagramPacketAllocator);
1469 } else {
1470 sendResult = connectionSendSimple(conn);
1471 }
1472
1473
1474
1475 Runnable task = conn.sslTask();
1476 if (task != null) {
1477 if (runTasksDirectly()) {
1478
1479 do {
1480 task.run();
1481
1482 notifyEarlyDataReadyIfNeeded(conn);
1483 } while ((task = conn.sslTask()) != null);
1484
1485
1486
1487 eventLoop().execute(new Runnable() {
1488 @Override
1489 public void run() {
1490
1491 if (connectionSend(conn) != SendResult.NONE) {
1492 forceFlushParent();
1493 }
1494 freeIfClosed();
1495 }
1496 });
1497 } else {
1498 runAllTaskSend(conn, task);
1499 }
1500 } else {
1501
1502 notifyEarlyDataReadyIfNeeded(conn);
1503 }
1504
1505
1506 timeoutHandler.scheduleTimeout();
1507 return sendResult;
1508 } finally {
1509 reantranceGuard &= ~IN_CONNECTION_SEND;
1510 }
1511 }
1512
1513 private final class QuicChannelUnsafe extends AbstractChannel.AbstractUnsafe {
1514
1515 void connectStream(QuicStreamType type, @Nullable ChannelHandler handler,
1516 Promise<QuicStreamChannel> promise) {
1517 if (!promise.setUncancellable()) {
1518 return;
1519 }
1520 long streamId = idGenerator.nextStreamId(type == QuicStreamType.BIDIRECTIONAL);
1521
1522 try {
1523 int res = streamSend0(connection, streamId, Unpooled.EMPTY_BUFFER, false);
1524 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
1525 throw Quiche.convertToException(res);
1526 }
1527 } catch (Exception e) {
1528 promise.setFailure(e);
1529 return;
1530 }
1531 if (type == QuicStreamType.UNIDIRECTIONAL) {
1532 UNI_STREAMS_LEFT_UPDATER.decrementAndGet(QuicheQuicChannel.this);
1533 } else {
1534 BIDI_STREAMS_LEFT_UPDATER.decrementAndGet(QuicheQuicChannel.this);
1535 }
1536 QuicheQuicStreamChannel streamChannel = addNewStreamChannel(streamId);
1537 if (handler != null) {
1538 streamChannel.pipeline().addLast(handler);
1539 }
1540 eventLoop().register(streamChannel).addListener((ChannelFuture f) -> {
1541 if (f.isSuccess()) {
1542 promise.setSuccess(streamChannel);
1543 } else {
1544 promise.setFailure(f.cause());
1545 streams.remove(streamId);
1546 }
1547 });
1548 }
1549
1550 @Override
1551 public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
1552 assert eventLoop().inEventLoop();
1553 if (!channelPromise.setUncancellable()) {
1554 return;
1555 }
1556 if (server) {
1557 channelPromise.setFailure(new UnsupportedOperationException());
1558 return;
1559 }
1560
1561 if (connectPromise != null) {
1562 channelPromise.setFailure(new ConnectionPendingException());
1563 return;
1564 }
1565
1566 if (remote instanceof QuicConnectionAddress) {
1567 if (!sourceConnectionIds.isEmpty()) {
1568
1569 channelPromise.setFailure(new AlreadyConnectedException());
1570 return;
1571 }
1572
1573 connectLocalAddress = (QuicConnectionAddress) local;
1574 connectRemoteAddress = (QuicConnectionAddress) remote;
1575 connectPromise = channelPromise;
1576
1577
1578 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1579 if (connectTimeoutMillis > 0) {
1580 connectTimeoutFuture = eventLoop().schedule(() -> {
1581 ChannelPromise connectPromise = QuicheQuicChannel.this.connectPromise;
1582 if (connectPromise != null && !connectPromise.isDone()
1583 && connectPromise.tryFailure(new ConnectTimeoutException(
1584 "connection timed out: " + local + " -> " + remote))) {
1585 close(voidPromise());
1586 }
1587 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1588 }
1589
1590 connectPromise.addListener((ChannelFuture future) -> {
1591 if (future.isCancelled()) {
1592 if (connectTimeoutFuture != null) {
1593 connectTimeoutFuture.cancel(false);
1594 }
1595 connectPromise = null;
1596 close(voidPromise());
1597 }
1598 });
1599
1600 parent().connect(new QuicheQuicChannelAddress(QuicheQuicChannel.this))
1601 .addListener(f -> {
1602 ChannelPromise connectPromise = QuicheQuicChannel.this.connectPromise;
1603 if (connectPromise != null && !f.isSuccess()) {
1604 connectPromise.tryFailure(f.cause());
1605
1606 unsafe().closeForcibly();
1607 }
1608 });
1609 return;
1610 }
1611
1612 channelPromise.setFailure(new UnsupportedOperationException());
1613 }
1614
1615 private void fireConnectCloseEventIfNeeded(QuicheQuicConnection conn) {
1616 if (connectionCloseEvent == null && !conn.isFreed()) {
1617 connectionCloseEvent = Quiche.quiche_conn_peer_error(conn.address());
1618 if (connectionCloseEvent != null) {
1619 pipeline().fireUserEventTriggered(connectionCloseEvent);
1620 }
1621 }
1622 }
1623
1624 void connectionRecv(InetSocketAddress sender, InetSocketAddress recipient, ByteBuf buffer) {
1625 QuicheQuicConnection conn = QuicheQuicChannel.this.connection;
1626 if (conn.isFreed()) {
1627 return;
1628 }
1629 int bufferReadable = buffer.readableBytes();
1630 if (bufferReadable == 0) {
1631
1632
1633 return;
1634 }
1635
1636 reantranceGuard |= IN_RECV;
1637 boolean close = false;
1638 try {
1639 ByteBuf tmpBuffer = null;
1640
1641
1642 if (buffer.isReadOnly()) {
1643 tmpBuffer = alloc().directBuffer(buffer.readableBytes());
1644 tmpBuffer.writeBytes(buffer);
1645 buffer = tmpBuffer;
1646 }
1647 long memoryAddress = Quiche.readerMemoryAddress(buffer);
1648
1649 ByteBuffer recvInfo = conn.nextRecvInfo();
1650 QuicheRecvInfo.setRecvInfo(recvInfo, sender, recipient);
1651
1652 remote = sender;
1653 local = recipient;
1654
1655 try {
1656 do {
1657
1658 int res = Quiche.quiche_conn_recv(conn.address(), memoryAddress, bufferReadable,
1659 Quiche.memoryAddressWithPosition(recvInfo));
1660 final boolean done;
1661 if (res < 0) {
1662 done = true;
1663 if (res != Quiche.QUICHE_ERR_DONE) {
1664 close = Quiche.shouldClose(res);
1665 Exception e = Quiche.convertToException(res);
1666 if (tryFailConnectPromise(e)) {
1667 break;
1668 }
1669 fireExceptionEvents(conn, e);
1670 }
1671 } else {
1672 done = false;
1673 }
1674
1675 Runnable task = conn.sslTask();
1676 if (task != null) {
1677 if (runTasksDirectly()) {
1678
1679 do {
1680 task.run();
1681 } while ((task = conn.sslTask()) != null);
1682 processReceived(conn);
1683 } else {
1684 runAllTaskRecv(conn, task);
1685 }
1686 } else {
1687 processReceived(conn);
1688 }
1689
1690 if (done) {
1691 break;
1692 }
1693 memoryAddress += res;
1694 bufferReadable -= res;
1695 } while (bufferReadable > 0 && !conn.isFreed());
1696 } finally {
1697 buffer.skipBytes((int) (memoryAddress - Quiche.readerMemoryAddress(buffer)));
1698 if (tmpBuffer != null) {
1699 tmpBuffer.release();
1700 }
1701 }
1702 if (close) {
1703
1704 unsafe().close(newPromise());
1705 }
1706 } finally {
1707 reantranceGuard &= ~IN_RECV;
1708 }
1709 }
1710
1711 private void processReceived(QuicheQuicConnection conn) {
1712
1713 if (handlePendingChannelActive(conn)) {
1714
1715 return;
1716 }
1717
1718 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1719 fireConnectCloseEventIfNeeded(conn);
1720
1721 if (conn.isFreed()) {
1722 return;
1723 }
1724
1725 long connAddr = conn.address();
1726 if (Quiche.quiche_conn_is_established(connAddr) ||
1727 Quiche.quiche_conn_is_in_early_data(connAddr)) {
1728 long uniLeftOld = uniStreamsLeft;
1729 long bidiLeftOld = bidiStreamsLeft;
1730
1731 if (uniLeftOld == 0 || bidiLeftOld == 0) {
1732 long uniLeft = Quiche.quiche_conn_peer_streams_left_uni(connAddr);
1733 long bidiLeft = Quiche.quiche_conn_peer_streams_left_bidi(connAddr);
1734 uniStreamsLeft = uniLeft;
1735 bidiStreamsLeft = bidiLeft;
1736 if (uniLeftOld != uniLeft || bidiLeftOld != bidiLeft) {
1737 pipeline().fireUserEventTriggered(QuicStreamLimitChangedEvent.INSTANCE);
1738 }
1739 }
1740
1741 handlePathEvents(conn);
1742
1743 if (handleWritableStreams(conn)) {
1744
1745 flushParent();
1746 }
1747
1748 datagramReadable = true;
1749 streamReadable = true;
1750
1751 recvDatagram(conn);
1752 recvStream(conn);
1753 }
1754 }
1755
1756 private void handlePathEvents(QuicheQuicConnection conn) {
1757 long event;
1758 while (!conn.isFreed() && (event = Quiche.quiche_conn_path_event_next(conn.address())) > 0) {
1759 try {
1760 int type = Quiche.quiche_path_event_type(event);
1761
1762 if (type == Quiche.QUICHE_PATH_EVENT_NEW) {
1763 Object[] ret = Quiche.quiche_path_event_new(event);
1764 InetSocketAddress local = (InetSocketAddress) ret[0];
1765 InetSocketAddress peer = (InetSocketAddress) ret[1];
1766 pipeline().fireUserEventTriggered(new QuicPathEvent.New(local, peer));
1767 } else if (type == Quiche.QUICHE_PATH_EVENT_VALIDATED) {
1768 Object[] ret = Quiche.quiche_path_event_validated(event);
1769 InetSocketAddress local = (InetSocketAddress) ret[0];
1770 InetSocketAddress peer = (InetSocketAddress) ret[1];
1771 pipeline().fireUserEventTriggered(new QuicPathEvent.Validated(local, peer));
1772 } else if (type == Quiche.QUICHE_PATH_EVENT_FAILED_VALIDATION) {
1773 Object[] ret = Quiche.quiche_path_event_failed_validation(event);
1774 InetSocketAddress local = (InetSocketAddress) ret[0];
1775 InetSocketAddress peer = (InetSocketAddress) ret[1];
1776 pipeline().fireUserEventTriggered(new QuicPathEvent.FailedValidation(local, peer));
1777 } else if (type == Quiche.QUICHE_PATH_EVENT_CLOSED) {
1778 Object[] ret = Quiche.quiche_path_event_closed(event);
1779 InetSocketAddress local = (InetSocketAddress) ret[0];
1780 InetSocketAddress peer = (InetSocketAddress) ret[1];
1781 pipeline().fireUserEventTriggered(new QuicPathEvent.Closed(local, peer));
1782 } else if (type == Quiche.QUICHE_PATH_EVENT_REUSED_SOURCE_CONNECTION_ID) {
1783 Object[] ret = Quiche.quiche_path_event_reused_source_connection_id(event);
1784 Long seq = (Long) ret[0];
1785 InetSocketAddress localOld = (InetSocketAddress) ret[1];
1786 InetSocketAddress peerOld = (InetSocketAddress) ret[2];
1787 InetSocketAddress local = (InetSocketAddress) ret[3];
1788 InetSocketAddress peer = (InetSocketAddress) ret[4];
1789 pipeline().fireUserEventTriggered(
1790 new QuicPathEvent.ReusedSourceConnectionId(seq, localOld, peerOld, local, peer));
1791 } else if (type == Quiche.QUICHE_PATH_EVENT_PEER_MIGRATED) {
1792 Object[] ret = Quiche.quiche_path_event_peer_migrated(event);
1793 InetSocketAddress local = (InetSocketAddress) ret[0];
1794 InetSocketAddress peer = (InetSocketAddress) ret[1];
1795 pipeline().fireUserEventTriggered(new QuicPathEvent.PeerMigrated(local, peer));
1796 }
1797 } finally {
1798 Quiche.quiche_path_event_free(event);
1799 }
1800 }
1801 }
1802
1803 private void runAllTaskRecv(QuicheQuicConnection conn, Runnable task) {
1804 sslTaskExecutor.execute(decorateTaskRecv(conn, task));
1805 }
1806
1807 private Runnable decorateTaskRecv(QuicheQuicConnection conn, Runnable task) {
1808 return () -> {
1809 try {
1810 runAll(conn, task);
1811 } finally {
1812
1813 eventLoop().execute(() -> {
1814 if (!conn.isFreed()) {
1815 processReceived(conn);
1816
1817
1818 if (connectionSend(conn) != SendResult.NONE) {
1819 forceFlushParent();
1820 }
1821
1822 freeIfClosed();
1823 }
1824 });
1825 }
1826 };
1827 }
1828 void recv() {
1829 QuicheQuicConnection conn = connection;
1830 if ((reantranceGuard & IN_RECV) != 0 || conn.isFreed()) {
1831 return;
1832 }
1833
1834 long connAddr = conn.address();
1835
1836 if (!Quiche.quiche_conn_is_established(connAddr) &&
1837 !Quiche.quiche_conn_is_in_early_data(connAddr)) {
1838 return;
1839 }
1840
1841 reantranceGuard |= IN_RECV;
1842 try {
1843 recvDatagram(conn);
1844 recvStream(conn);
1845 } finally {
1846 fireChannelReadCompleteIfNeeded();
1847 reantranceGuard &= ~IN_RECV;
1848 }
1849 }
1850
1851 private void recvStream(QuicheQuicConnection conn) {
1852 if (conn.isFreed()) {
1853 return;
1854 }
1855 long connAddr = conn.address();
1856 long readableIterator = Quiche.quiche_conn_readable(connAddr);
1857 int totalReadable = 0;
1858 if (readableIterator != -1) {
1859 try {
1860
1861 if (recvStreamPending && streamReadable) {
1862 for (;;) {
1863 int readable = Quiche.quiche_stream_iter_next(
1864 readableIterator, readableStreams);
1865 for (int i = 0; i < readable; i++) {
1866 long streamId = readableStreams[i];
1867 QuicheQuicStreamChannel streamChannel = streams.get(streamId);
1868 if (streamChannel == null) {
1869 recvStreamPending = false;
1870 fireChannelReadCompletePending = true;
1871 streamChannel = addNewStreamChannel(streamId);
1872 streamChannel.readable();
1873 pipeline().fireChannelRead(streamChannel);
1874 } else {
1875 streamChannel.readable();
1876 }
1877 }
1878 if (readable < readableStreams.length) {
1879
1880 streamReadable = false;
1881 break;
1882 }
1883 if (readable > 0) {
1884 totalReadable += readable;
1885 }
1886 }
1887 }
1888 } finally {
1889 Quiche.quiche_stream_iter_free(readableIterator);
1890 }
1891 readableStreams = growIfNeeded(readableStreams, totalReadable);
1892 }
1893 }
1894
1895 private void recvDatagram(QuicheQuicConnection conn) {
1896 if (!supportsDatagram) {
1897 return;
1898 }
1899 while (recvDatagramPending && datagramReadable && !conn.isFreed()) {
1900 @SuppressWarnings("deprecation")
1901 RecvByteBufAllocator.Handle recvHandle = recvBufAllocHandle();
1902 recvHandle.reset(config());
1903
1904 int numMessagesRead = 0;
1905 do {
1906 long connAddr = conn.address();
1907 int len = Quiche.quiche_conn_dgram_recv_front_len(connAddr);
1908 if (len == Quiche.QUICHE_ERR_DONE) {
1909 datagramReadable = false;
1910 return;
1911 }
1912
1913 ByteBuf datagramBuffer = alloc().directBuffer(len);
1914 recvHandle.attemptedBytesRead(datagramBuffer.writableBytes());
1915 int writerIndex = datagramBuffer.writerIndex();
1916 long memoryAddress = Quiche.writerMemoryAddress(datagramBuffer);
1917
1918 int written = Quiche.quiche_conn_dgram_recv(connAddr,
1919 memoryAddress, datagramBuffer.writableBytes());
1920 if (written < 0) {
1921 datagramBuffer.release();
1922 if (written == Quiche.QUICHE_ERR_DONE) {
1923
1924 datagramReadable = false;
1925 break;
1926 }
1927 pipeline().fireExceptionCaught(Quiche.convertToException(written));
1928 }
1929 recvHandle.lastBytesRead(written);
1930 recvHandle.incMessagesRead(1);
1931 numMessagesRead++;
1932 datagramBuffer.writerIndex(writerIndex + written);
1933 recvDatagramPending = false;
1934 fireChannelReadCompletePending = true;
1935
1936 pipeline().fireChannelRead(datagramBuffer);
1937 } while (recvHandle.continueReading() && !conn.isFreed());
1938 recvHandle.readComplete();
1939
1940
1941 if (numMessagesRead > 0) {
1942 fireChannelReadCompleteIfNeeded();
1943 }
1944 }
1945 }
1946
1947 private boolean handlePendingChannelActive(QuicheQuicConnection conn) {
1948 if (conn.isFreed() || state == ChannelState.CLOSED) {
1949 return true;
1950 }
1951 if (server) {
1952 if (state == ChannelState.OPEN && Quiche.quiche_conn_is_established(conn.address())) {
1953
1954 state = ChannelState.ACTIVE;
1955
1956 fireDatagramExtensionEvent(conn);
1957 pipeline().fireChannelActive();
1958 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1959 }
1960 } else if (connectPromise != null && Quiche.quiche_conn_is_established(conn.address())) {
1961 ChannelPromise promise = connectPromise;
1962 connectPromise = null;
1963 state = ChannelState.ACTIVE;
1964
1965 boolean promiseSet = promise.trySuccess();
1966 fireDatagramExtensionEvent(conn);
1967 pipeline().fireChannelActive();
1968 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1969 if (!promiseSet) {
1970 fireConnectCloseEventIfNeeded(conn);
1971 this.close(this.voidPromise());
1972 return true;
1973 }
1974 }
1975 return false;
1976 }
1977
1978 private void fireDatagramExtensionEvent(QuicheQuicConnection conn) {
1979 if (conn.isClosed()) {
1980 return;
1981 }
1982 long connAddr = conn.address();
1983 int len = Quiche.quiche_conn_dgram_max_writable_len(connAddr);
1984
1985 if (len != Quiche.QUICHE_ERR_DONE) {
1986 pipeline().fireUserEventTriggered(new QuicDatagramExtensionEvent(len));
1987 }
1988 }
1989
1990 private QuicheQuicStreamChannel addNewStreamChannel(long streamId) {
1991 QuicheQuicStreamChannel streamChannel = new QuicheQuicStreamChannel(
1992 QuicheQuicChannel.this, streamId);
1993 QuicheQuicStreamChannel old = streams.put(streamId, streamChannel);
1994 assert old == null;
1995 streamChannel.writable(streamCapacity(streamId));
1996 return streamChannel;
1997 }
1998 }
1999
2000
2001
2002
2003 void finishConnect() {
2004 assert !server;
2005 assert connection != null;
2006 if (connectionSend(connection) != SendResult.NONE) {
2007 flushParent();
2008 }
2009 }
2010
2011 private void notifyEarlyDataReadyIfNeeded(QuicheQuicConnection conn) {
2012 if (!server && !earlyDataReadyNotified &&
2013 !conn.isFreed() && Quiche.quiche_conn_is_in_early_data(conn.address())) {
2014 earlyDataReadyNotified = true;
2015 pipeline().fireUserEventTriggered(SslEarlyDataReadyEvent.INSTANCE);
2016 }
2017 }
2018
2019 private final class TimeoutHandler implements Runnable {
2020 private ScheduledFuture<?> timeoutFuture;
2021
2022 @Override
2023 public void run() {
2024 QuicheQuicConnection conn = connection;
2025 if (conn.isFreed()) {
2026 return;
2027 }
2028 if (!freeIfClosed()) {
2029 long connAddr = conn.address();
2030 timeoutFuture = null;
2031
2032 Quiche.quiche_conn_on_timeout(connAddr);
2033 if (!freeIfClosed()) {
2034
2035
2036 if (connectionSend(conn) != SendResult.NONE) {
2037 flushParent();
2038 }
2039 boolean closed = freeIfClosed();
2040 if (!closed) {
2041
2042 scheduleTimeout();
2043 }
2044 }
2045 }
2046 }
2047
2048
2049
2050 void scheduleTimeout() {
2051 QuicheQuicConnection conn = connection;
2052 if (conn.isFreed()) {
2053 cancel();
2054 return;
2055 }
2056 if (conn.isClosed()) {
2057 cancel();
2058 unsafe().close(newPromise());
2059 return;
2060 }
2061 long nanos = Quiche.quiche_conn_timeout_as_nanos(conn.address());
2062 if (nanos < 0 || nanos == Long.MAX_VALUE) {
2063
2064 cancel();
2065 return;
2066 }
2067 if (timeoutFuture == null) {
2068 timeoutFuture = eventLoop().schedule(this,
2069 nanos, TimeUnit.NANOSECONDS);
2070 } else {
2071 long remaining = timeoutFuture.getDelay(TimeUnit.NANOSECONDS);
2072 if (remaining <= 0) {
2073
2074
2075 cancel();
2076 run();
2077 } else if (remaining > nanos) {
2078
2079
2080 cancel();
2081 timeoutFuture = eventLoop().schedule(this, nanos, TimeUnit.NANOSECONDS);
2082 }
2083 }
2084 }
2085
2086 void cancel() {
2087 if (timeoutFuture != null) {
2088 timeoutFuture.cancel(false);
2089 timeoutFuture = null;
2090 }
2091 }
2092 }
2093
2094 @Override
2095 public Future<QuicConnectionStats> collectStats(Promise<QuicConnectionStats> promise) {
2096 if (eventLoop().inEventLoop()) {
2097 collectStats0(promise);
2098 } else {
2099 eventLoop().execute(() -> collectStats0(promise));
2100 }
2101 return promise;
2102 }
2103
2104 private void collectStats0(Promise<QuicConnectionStats> promise) {
2105 QuicheQuicConnection conn = connection;
2106 if (conn.isFreed()) {
2107 promise.setSuccess(statsAtClose);
2108 return;
2109 }
2110
2111 collectStats0(connection, promise);
2112 }
2113
2114 @Nullable
2115 private QuicConnectionStats collectStats0(QuicheQuicConnection connection, Promise<QuicConnectionStats> promise) {
2116 final long[] stats = Quiche.quiche_conn_stats(connection.address());
2117 if (stats == null) {
2118 promise.setFailure(new IllegalStateException("native quiche_conn_stats(...) failed"));
2119 return null;
2120 }
2121
2122 final QuicheQuicConnectionStats connStats =
2123 new QuicheQuicConnectionStats(stats);
2124 promise.setSuccess(connStats);
2125 return connStats;
2126 }
2127
2128 @Override
2129 public Future<QuicConnectionPathStats> collectPathStats(int pathIdx, Promise<QuicConnectionPathStats> promise) {
2130 if (eventLoop().inEventLoop()) {
2131 collectPathStats0(pathIdx, promise);
2132 } else {
2133 eventLoop().execute(() -> collectPathStats0(pathIdx, promise));
2134 }
2135 return promise;
2136 }
2137
2138 private void collectPathStats0(int pathIdx, Promise<QuicConnectionPathStats> promise) {
2139 QuicheQuicConnection conn = connection;
2140 if (conn.isFreed()) {
2141 promise.setFailure(new IllegalStateException("Connection is closed"));
2142 return;
2143 }
2144
2145 final Object[] stats = Quiche.quiche_conn_path_stats(connection.address(), pathIdx);
2146 if (stats == null) {
2147 promise.setFailure(new IllegalStateException("native quiche_conn_path_stats(...) failed"));
2148 return;
2149 }
2150 promise.setSuccess(new QuicheQuicConnectionPathStats(stats));
2151 }
2152
2153 @Override
2154 public QuicTransportParameters peerTransportParameters() {
2155 return connection.peerParameters();
2156 }
2157 }