1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.quic;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.AbstractChannel;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandler;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.ChannelMetadata;
27 import io.netty.channel.ChannelOption;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPipeline;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.DefaultChannelPipeline;
33 import io.netty.channel.EventLoop;
34 import io.netty.channel.RecvByteBufAllocator;
35 import io.netty.channel.socket.DatagramPacket;
36 import io.netty.handler.ssl.SniCompletionEvent;
37 import io.netty.handler.ssl.SslHandshakeCompletionEvent;
38 import io.netty.util.AttributeKey;
39 import io.netty.util.collection.LongObjectHashMap;
40 import io.netty.util.collection.LongObjectMap;
41 import io.netty.util.concurrent.Future;
42 import io.netty.util.concurrent.ImmediateEventExecutor;
43 import io.netty.util.concurrent.ImmediateExecutor;
44 import io.netty.util.concurrent.Promise;
45 import io.netty.util.internal.StringUtil;
46 import io.netty.util.internal.logging.InternalLogger;
47 import io.netty.util.internal.logging.InternalLoggerFactory;
48 import org.jetbrains.annotations.Nullable;
49
50 import javax.net.ssl.SSLEngine;
51 import javax.net.ssl.SSLHandshakeException;
52 import java.io.File;
53 import java.net.ConnectException;
54 import java.net.InetSocketAddress;
55 import java.net.SocketAddress;
56 import java.nio.BufferUnderflowException;
57 import java.nio.ByteBuffer;
58 import java.nio.channels.AlreadyConnectedException;
59 import java.nio.channels.ClosedChannelException;
60 import java.nio.channels.ConnectionPendingException;
61 import java.util.ArrayList;
62 import java.util.Collections;
63 import java.util.HashSet;
64 import java.util.List;
65 import java.util.Map;
66 import java.util.Set;
67 import java.util.concurrent.Executor;
68 import java.util.concurrent.ScheduledFuture;
69 import java.util.concurrent.TimeUnit;
70 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
71 import java.util.function.Consumer;
72 import java.util.function.Function;
73
74
75
76
77 final class QuicheQuicChannel extends AbstractChannel implements QuicChannel {
78 private static final InternalLogger logger = InternalLoggerFactory.getInstance(QuicheQuicChannel.class);
79 private static final String QLOG_FILE_EXTENSION = ".qlog";
80
81 enum StreamRecvResult {
82
83
84
85 DONE,
86
87
88
89 FIN,
90
91
92
93 OK
94 }
95
96 private enum ChannelState {
97 OPEN,
98 ACTIVE,
99 CLOSED
100 }
101
102 private enum SendResult {
103 SOME,
104 NONE,
105 CLOSE
106 }
107
108 private static final class CloseData implements ChannelFutureListener {
109 final boolean applicationClose;
110 final int err;
111 final ByteBuf reason;
112
113 CloseData(boolean applicationClose, int err, ByteBuf reason) {
114 this.applicationClose = applicationClose;
115 this.err = err;
116 this.reason = reason;
117 }
118
119 @Override
120 public void operationComplete(ChannelFuture future) {
121 reason.release();
122 }
123 }
124
125 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
126 private long[] readableStreams = new long[4];
127 private long[] writableStreams = new long[4];
128 private final LongObjectMap<QuicheQuicStreamChannel> streams = new LongObjectHashMap<>();
129 private final QuicheQuicChannelConfig config;
130 private final boolean server;
131 private final QuicStreamIdGenerator idGenerator;
132 private final ChannelHandler streamHandler;
133 private final Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray;
134 private final Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray;
135 private final TimeoutHandler timeoutHandler;
136 private final QuicConnectionIdGenerator connectionIdAddressGenerator;
137 private final QuicResetTokenGenerator resetTokenGenerator;
138 private final Set<ByteBuffer> sourceConnectionIds = new HashSet<>();
139
140 private Consumer<QuicheQuicChannel> freeTask;
141 private Executor sslTaskExecutor;
142 private boolean inFireChannelReadCompleteQueue;
143 private boolean fireChannelReadCompletePending;
144 private ByteBuf finBuffer;
145 private ByteBuf outErrorCodeBuffer;
146 private ChannelPromise connectPromise;
147 private ScheduledFuture<?> connectTimeoutFuture;
148 private QuicConnectionAddress connectLocalAddress;
149 private QuicConnectionAddress connectRemoteAddress;
150 private CloseData closeData;
151 private QuicConnectionCloseEvent connectionCloseEvent;
152 private QuicConnectionStats statsAtClose;
153 private boolean supportsDatagram;
154 private boolean recvDatagramPending;
155 private boolean datagramReadable;
156 private boolean recvStreamPending;
157 private boolean streamReadable;
158 private boolean handshakeCompletionNotified;
159 private boolean earlyDataReadyNotified;
160 private int reantranceGuard;
161 private static final int IN_RECV = 1 << 1;
162 private static final int IN_CONNECTION_SEND = 1 << 2;
163 private static final int IN_HANDLE_WRITABLE_STREAMS = 1 << 3;
164 private volatile ChannelState state = ChannelState.OPEN;
165 private volatile boolean timedOut;
166 private volatile String traceId;
167 private volatile QuicheQuicConnection connection;
168 private volatile InetSocketAddress local;
169 private volatile InetSocketAddress remote;
170
171 private final ChannelFutureListener continueSendingListener = f -> {
172 if (connectionSend(connection) != SendResult.NONE) {
173 flushParent();
174 }
175 };
176
177 private static final AtomicLongFieldUpdater<QuicheQuicChannel> UNI_STREAMS_LEFT_UPDATER =
178 AtomicLongFieldUpdater.newUpdater(QuicheQuicChannel.class, "uniStreamsLeft");
179 private volatile long uniStreamsLeft;
180
181 private static final AtomicLongFieldUpdater<QuicheQuicChannel> BIDI_STREAMS_LEFT_UPDATER =
182 AtomicLongFieldUpdater.newUpdater(QuicheQuicChannel.class, "bidiStreamsLeft");
183 private volatile long bidiStreamsLeft;
184
185 private QuicheQuicChannel(Channel parent, boolean server, @Nullable ByteBuffer key, InetSocketAddress local,
186 InetSocketAddress remote, boolean supportsDatagram, ChannelHandler streamHandler,
187 Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
188 Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray,
189 @Nullable Consumer<QuicheQuicChannel> freeTask,
190 @Nullable Executor sslTaskExecutor,
191 @Nullable QuicConnectionIdGenerator connectionIdAddressGenerator,
192 @Nullable QuicResetTokenGenerator resetTokenGenerator) {
193 super(parent);
194 config = new QuicheQuicChannelConfig(this);
195 this.freeTask = freeTask;
196 this.server = server;
197 this.idGenerator = new QuicStreamIdGenerator(server);
198 this.connectionIdAddressGenerator = connectionIdAddressGenerator;
199 this.resetTokenGenerator = resetTokenGenerator;
200 if (key != null) {
201 this.sourceConnectionIds.add(key);
202 }
203
204 this.supportsDatagram = supportsDatagram;
205 this.local = local;
206 this.remote = remote;
207
208 this.streamHandler = streamHandler;
209 this.streamOptionsArray = streamOptionsArray;
210 this.streamAttrsArray = streamAttrsArray;
211 timeoutHandler = new TimeoutHandler();
212 this.sslTaskExecutor = sslTaskExecutor == null ? ImmediateExecutor.INSTANCE : sslTaskExecutor;
213 }
214
215 static QuicheQuicChannel forClient(Channel parent, InetSocketAddress local, InetSocketAddress remote,
216 ChannelHandler streamHandler,
217 Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
218 Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray) {
219 return new QuicheQuicChannel(parent, false, null, local, remote, false, streamHandler,
220 streamOptionsArray, streamAttrsArray, null, null, null, null);
221 }
222
223 static QuicheQuicChannel forServer(Channel parent, ByteBuffer key, InetSocketAddress local,
224 InetSocketAddress remote,
225 boolean supportsDatagram, ChannelHandler streamHandler,
226 Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
227 Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray,
228 Consumer<QuicheQuicChannel> freeTask, Executor sslTaskExecutor,
229 QuicConnectionIdGenerator connectionIdAddressGenerator,
230 QuicResetTokenGenerator resetTokenGenerator) {
231 return new QuicheQuicChannel(parent, true, key, local, remote, supportsDatagram,
232 streamHandler, streamOptionsArray, streamAttrsArray, freeTask,
233 sslTaskExecutor, connectionIdAddressGenerator, resetTokenGenerator);
234 }
235
236 private static final int MAX_ARRAY_LEN = 128;
237
238 private static long[] growIfNeeded(long[] array, int maxLength) {
239 if (maxLength > array.length) {
240 if (array.length == MAX_ARRAY_LEN) {
241 return array;
242 }
243
244 return new long[Math.min(MAX_ARRAY_LEN, array.length + 4)];
245 }
246 return array;
247 }
248
249 @Override
250 public boolean isTimedOut() {
251 return timedOut;
252 }
253
254 @Override
255 public SSLEngine sslEngine() {
256 QuicheQuicConnection connection = this.connection;
257 return connection == null ? null : connection.engine();
258 }
259
260 private void notifyAboutHandshakeCompletionIfNeeded(QuicheQuicConnection conn,
261 @Nullable SSLHandshakeException cause) {
262 if (handshakeCompletionNotified) {
263 return;
264 }
265 if (cause != null) {
266 pipeline().fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
267 return;
268 }
269 if (conn.isFreed()) {
270 return;
271 }
272 switch (connection.engine().getHandshakeStatus()) {
273 case NOT_HANDSHAKING:
274 case FINISHED:
275 handshakeCompletionNotified = true;
276 pipeline().fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
277 break;
278 default:
279 break;
280 }
281 }
282
283 @Override
284 public long peerAllowedStreams(QuicStreamType type) {
285 switch (type) {
286 case BIDIRECTIONAL:
287 return bidiStreamsLeft;
288 case UNIDIRECTIONAL:
289 return uniStreamsLeft;
290 default:
291 return 0;
292 }
293 }
294
295 void attachQuicheConnection(QuicheQuicConnection connection) {
296 this.connection = connection;
297
298 byte[] traceId = Quiche.quiche_conn_trace_id(connection.address());
299 if (traceId != null) {
300 this.traceId = new String(traceId);
301 }
302
303 connection.init(local, remote,
304 sniHostname -> pipeline().fireUserEventTriggered(new SniCompletionEvent(sniHostname)));
305
306
307 QLogConfiguration configuration = config.getQLogConfiguration();
308 if (configuration != null) {
309 final String fileName;
310 File file = new File(configuration.path());
311 if (file.isDirectory()) {
312
313 file.mkdir();
314 if (this.traceId != null) {
315 fileName = configuration.path() + File.separatorChar + this.traceId + "-" +
316 id().asShortText() + QLOG_FILE_EXTENSION;
317 } else {
318 fileName = configuration.path() + File.separatorChar + id().asShortText() + QLOG_FILE_EXTENSION;
319 }
320 } else {
321 fileName = configuration.path();
322 }
323
324 if (!Quiche.quiche_conn_set_qlog_path(connection.address(), fileName,
325 configuration.logTitle(), configuration.logDescription())) {
326 logger.info("Unable to create qlog file: {} ", fileName);
327 }
328 }
329 }
330
331 void connectNow(Function<QuicChannel, ? extends QuicSslEngine> engineProvider, Executor sslTaskExecutor,
332 Consumer<QuicheQuicChannel> freeTask, long configAddr, int localConnIdLength,
333 boolean supportsDatagram, ByteBuffer fromSockaddrMemory, ByteBuffer toSockaddrMemory)
334 throws Exception {
335 assert this.connection == null;
336 assert this.traceId == null;
337 assert this.sourceConnectionIds.isEmpty();
338
339 this.sslTaskExecutor = sslTaskExecutor;
340 this.freeTask = freeTask;
341
342 QuicConnectionAddress connectLocalAddress = this.connectLocalAddress;
343
344 if (connectLocalAddress == QuicConnectionAddress.EPHEMERAL) {
345 connectLocalAddress = QuicConnectionAddress.random(localConnIdLength);
346 }
347 ByteBuffer localConnectId = connectLocalAddress.id();
348 if (localConnectId.remaining() != localConnIdLength) {
349 failConnectPromiseAndThrow(new IllegalArgumentException("connectionAddress has length "
350 + localConnectId.remaining()
351 + " instead of " + localConnIdLength));
352 }
353 QuicConnectionAddress connectRemoteAddress = this.connectRemoteAddress;
354
355 QuicSslEngine engine = engineProvider.apply(this);
356 if (!(engine instanceof QuicheQuicSslEngine)) {
357 failConnectPromiseAndThrow(new IllegalArgumentException("QuicSslEngine is not of type "
358 + QuicheQuicSslEngine.class.getSimpleName()));
359 return;
360 }
361 if (!engine.getUseClientMode()) {
362 failConnectPromiseAndThrow(new IllegalArgumentException("QuicSslEngine is not create in client mode"));
363 }
364 QuicheQuicSslEngine quicheEngine = (QuicheQuicSslEngine) engine;
365 ByteBuf localIdBuffer = alloc().directBuffer(localConnectId.remaining()).writeBytes(localConnectId.duplicate());
366 ByteBuf remoteIdBuffer;
367 if (connectRemoteAddress != QuicConnectionAddress.EPHEMERAL) {
368 ByteBuffer remoteId = connectRemoteAddress.id();
369 remoteIdBuffer = alloc().directBuffer(remoteId.remaining()).writeBytes(remoteId.duplicate());
370 } else {
371 remoteIdBuffer = null;
372 }
373 try {
374 int fromSockaddrLen = SockaddrIn.setAddress(fromSockaddrMemory, local);
375 int toSockaddrLen = SockaddrIn.setAddress(toSockaddrMemory, remote);
376 QuicheQuicConnection connection = quicheEngine.createConnection(ssl -> {
377 long localIdMemoryAddress = Quiche.readerMemoryAddress(localIdBuffer);
378 int localIdLength = localIdBuffer.readableBytes();
379
380 final long remoteIdMemoryAddress;
381 final int remoteIdLength;
382 if (remoteIdBuffer == null) {
383 return Quiche.quiche_conn_new_with_tls(localIdMemoryAddress, localIdLength,
384 -1, -1,
385 Quiche.memoryAddressWithPosition(fromSockaddrMemory), fromSockaddrLen,
386 Quiche.memoryAddressWithPosition(toSockaddrMemory), toSockaddrLen,
387 configAddr, ssl, false);
388 } else {
389 return Quiche.quiche_conn_new_with_tls_and_client_dcid(localIdMemoryAddress, localIdLength,
390 Quiche.readerMemoryAddress(remoteIdBuffer), remoteIdBuffer.readableBytes(),
391 Quiche.memoryAddressWithPosition(fromSockaddrMemory), fromSockaddrLen,
392 Quiche.memoryAddressWithPosition(toSockaddrMemory), toSockaddrLen,
393 configAddr, ssl);
394 }
395 });
396 if (connection == null) {
397 failConnectPromiseAndThrow(new ConnectException());
398 return;
399 }
400 attachQuicheConnection(connection);
401 QuicClientSessionCache sessionCache = quicheEngine.ctx.getSessionCache();
402 if (sessionCache != null) {
403 byte[] sessionBytes = sessionCache
404 .getSession(quicheEngine.getSession().getPeerHost(), quicheEngine.getSession().getPeerPort());
405 if (sessionBytes != null) {
406 Quiche.quiche_conn_set_session(connection.address(), sessionBytes);
407 }
408 }
409 this.supportsDatagram = supportsDatagram;
410 sourceConnectionIds.add(localConnectId);
411 } finally {
412 localIdBuffer.release();
413 if (remoteIdBuffer != null) {
414 remoteIdBuffer.release();
415 }
416 }
417 }
418
419 private void failConnectPromiseAndThrow(Exception e) throws Exception {
420 tryFailConnectPromise(e);
421 throw e;
422 }
423
424 private boolean tryFailConnectPromise(Exception e) {
425 ChannelPromise promise = connectPromise;
426 if (promise != null) {
427 connectPromise = null;
428 promise.tryFailure(e);
429 return true;
430 }
431 return false;
432 }
433
434 Set<ByteBuffer> sourceConnectionIds() {
435 return sourceConnectionIds;
436 }
437
438 boolean markInFireChannelReadCompleteQueue() {
439 if (inFireChannelReadCompleteQueue) {
440 return false;
441 }
442 inFireChannelReadCompleteQueue = true;
443 return true;
444 }
445
446 private void failPendingConnectPromise() {
447 ChannelPromise promise = QuicheQuicChannel.this.connectPromise;
448 if (promise != null) {
449 QuicheQuicChannel.this.connectPromise = null;
450 promise.tryFailure(new QuicClosedChannelException(this.connectionCloseEvent));
451 }
452 }
453
454 void forceClose() {
455 unsafe().close(voidPromise());
456 }
457
458 @Override
459 protected DefaultChannelPipeline newChannelPipeline() {
460 return new DefaultChannelPipeline(this) {
461 @Override
462 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
463 if (msg instanceof QuicStreamChannel) {
464 QuicStreamChannel channel = (QuicStreamChannel) msg;
465 Quic.setupChannel(channel, streamOptionsArray, streamAttrsArray, streamHandler, logger);
466 ctx.channel().eventLoop().register(channel);
467 } else {
468 super.onUnhandledInboundMessage(ctx, msg);
469 }
470 }
471 };
472 }
473
474 @Override
475 public QuicChannel flush() {
476 super.flush();
477 return this;
478 }
479
480 @Override
481 public QuicChannel read() {
482 super.read();
483 return this;
484 }
485
486 @Override
487 public Future<QuicStreamChannel> createStream(QuicStreamType type, @Nullable ChannelHandler handler,
488 Promise<QuicStreamChannel> promise) {
489 if (eventLoop().inEventLoop()) {
490 ((QuicChannelUnsafe) unsafe()).connectStream(type, handler, promise);
491 } else {
492 eventLoop().execute(() -> ((QuicChannelUnsafe) unsafe()).connectStream(type, handler, promise));
493 }
494 return promise;
495 }
496
497 @Override
498 public ChannelFuture close(boolean applicationClose, int error, ByteBuf reason, ChannelPromise promise) {
499 if (eventLoop().inEventLoop()) {
500 close0(applicationClose, error, reason, promise);
501 } else {
502 eventLoop().execute(() -> close0(applicationClose, error, reason, promise));
503 }
504 return promise;
505 }
506
507 private void close0(boolean applicationClose, int error, ByteBuf reason, ChannelPromise promise) {
508 if (closeData == null) {
509 if (!reason.hasMemoryAddress()) {
510
511 ByteBuf copy = alloc().directBuffer(reason.readableBytes()).writeBytes(reason);
512 reason.release();
513 reason = copy;
514 }
515 closeData = new CloseData(applicationClose, error, reason);
516 promise.addListener(closeData);
517 } else {
518
519 reason.release();
520 }
521 close(promise);
522 }
523
524 @Override
525 public String toString() {
526 String traceId = this.traceId;
527 if (traceId == null) {
528 return "()" + super.toString();
529 } else {
530 return '(' + traceId + ')' + super.toString();
531 }
532 }
533
534 @Override
535 protected AbstractUnsafe newUnsafe() {
536 return new QuicChannelUnsafe();
537 }
538
539 @Override
540 protected boolean isCompatible(EventLoop eventLoop) {
541 return parent().eventLoop() == eventLoop;
542 }
543
544 @Override
545 @Nullable
546 protected QuicConnectionAddress localAddress0() {
547 QuicheQuicConnection connection = this.connection;
548 return connection == null ? null : connection.sourceId();
549 }
550
551 @Override
552 @Nullable
553 protected QuicConnectionAddress remoteAddress0() {
554 QuicheQuicConnection connection = this.connection;
555 return connection == null ? null : connection.destinationId();
556 }
557
558 @Override
559 @Nullable
560 public QuicConnectionAddress localAddress() {
561
562 return localAddress0();
563 }
564
565 @Override
566 @Nullable
567 public QuicConnectionAddress remoteAddress() {
568
569 return remoteAddress0();
570 }
571
572 @Override
573 @Nullable
574 public SocketAddress localSocketAddress() {
575 return local;
576 }
577
578 @Override
579 @Nullable
580 public SocketAddress remoteSocketAddress() {
581 return remote;
582 }
583
584 @Override
585 protected void doBind(SocketAddress socketAddress) {
586 throw new UnsupportedOperationException();
587 }
588
589 @Override
590 protected void doDisconnect() throws Exception {
591 doClose();
592 }
593
594 @Override
595 protected void doClose() throws Exception {
596 if (state == ChannelState.CLOSED) {
597 return;
598 }
599 state = ChannelState.CLOSED;
600
601 QuicheQuicConnection conn = this.connection;
602 if (conn == null || conn.isFreed()) {
603 if (closeData != null) {
604 closeData.reason.release();
605 closeData = null;
606 }
607 failPendingConnectPromise();
608 return;
609 }
610
611
612 SendResult sendResult = connectionSend(conn);
613
614 final boolean app;
615 final int err;
616 final ByteBuf reason;
617 if (closeData == null) {
618 app = false;
619 err = 0;
620 reason = Unpooled.EMPTY_BUFFER;
621 } else {
622 app = closeData.applicationClose;
623 err = closeData.err;
624 reason = closeData.reason;
625 closeData = null;
626 }
627
628 failPendingConnectPromise();
629 try {
630 int res = Quiche.quiche_conn_close(conn.address(), app, err,
631 Quiche.readerMemoryAddress(reason), reason.readableBytes());
632 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
633 throw Quiche.convertToException(res);
634 }
635
636
637
638
639 if (connectionSend(conn) == SendResult.SOME) {
640 sendResult = SendResult.SOME;
641 }
642 } finally {
643
644
645
646 statsAtClose = collectStats0(conn, eventLoop().newPromise());
647 try {
648 timedOut = Quiche.quiche_conn_is_timed_out(conn.address());
649
650 closeStreams();
651 if (finBuffer != null) {
652 finBuffer.release();
653 finBuffer = null;
654 }
655 if (outErrorCodeBuffer != null) {
656 outErrorCodeBuffer.release();
657 outErrorCodeBuffer = null;
658 }
659 } finally {
660 if (sendResult == SendResult.SOME) {
661
662 forceFlushParent();
663 } else {
664 flushParent();
665 }
666 conn.free();
667 if (freeTask != null) {
668 freeTask.accept(this);
669 }
670 timeoutHandler.cancel();
671
672 local = null;
673 remote = null;
674 }
675 }
676 }
677
678 @Override
679 protected void doBeginRead() {
680 recvDatagramPending = true;
681 recvStreamPending = true;
682 if (datagramReadable || streamReadable) {
683 ((QuicChannelUnsafe) unsafe()).recv();
684 }
685 }
686
687 @Override
688 protected Object filterOutboundMessage(Object msg) {
689 if (msg instanceof ByteBuf) {
690 return msg;
691 }
692 throw new UnsupportedOperationException("Unsupported message type: " + StringUtil.simpleClassName(msg));
693 }
694
695 @Override
696 protected void doWrite(ChannelOutboundBuffer channelOutboundBuffer) throws Exception {
697 if (!supportsDatagram) {
698 throw new UnsupportedOperationException("Datagram extension is not supported");
699 }
700 boolean sendSomething = false;
701 boolean retry = false;
702 QuicheQuicConnection conn = connection;
703 try {
704 for (;;) {
705 ByteBuf buffer = (ByteBuf) channelOutboundBuffer.current();
706 if (buffer == null) {
707 break;
708 }
709
710 int readable = buffer.readableBytes();
711 if (readable == 0) {
712
713 channelOutboundBuffer.remove();
714 continue;
715 }
716
717 final int res;
718 if (!buffer.isDirect() || buffer.nioBufferCount() > 1) {
719 ByteBuf tmpBuffer = alloc().directBuffer(readable);
720 try {
721 tmpBuffer.writeBytes(buffer, buffer.readerIndex(), readable);
722 res = sendDatagram(conn, tmpBuffer);
723 } finally {
724 tmpBuffer.release();
725 }
726 } else {
727 res = sendDatagram(conn, buffer);
728 }
729 if (res >= 0) {
730 channelOutboundBuffer.remove();
731 sendSomething = true;
732 retry = false;
733 } else {
734 if (res == Quiche.QUICHE_ERR_BUFFER_TOO_SHORT) {
735 retry = false;
736 channelOutboundBuffer.remove(new BufferUnderflowException());
737 } else if (res == Quiche.QUICHE_ERR_INVALID_STATE) {
738 throw new UnsupportedOperationException("Remote peer does not support Datagram extension");
739 } else if (res == Quiche.QUICHE_ERR_DONE) {
740 if (retry) {
741
742 for (;;) {
743 if (!channelOutboundBuffer.remove()) {
744
745 return;
746 }
747 }
748 }
749
750 sendSomething = false;
751
752
753 if (connectionSend(conn) != SendResult.NONE) {
754 forceFlushParent();
755 }
756
757 retry = true;
758 } else {
759 throw Quiche.convertToException(res);
760 }
761 }
762 }
763 } finally {
764 if (sendSomething && connectionSend(conn) != SendResult.NONE) {
765 flushParent();
766 }
767 }
768 }
769
770 private static int sendDatagram(QuicheQuicConnection conn, ByteBuf buf) throws ClosedChannelException {
771 return Quiche.quiche_conn_dgram_send(connectionAddressChecked(conn),
772 Quiche.readerMemoryAddress(buf), buf.readableBytes());
773 }
774
775 @Override
776 public QuicChannelConfig config() {
777 return config;
778 }
779
780 @Override
781 public boolean isOpen() {
782 return state != ChannelState.CLOSED;
783 }
784
785 @Override
786 public boolean isActive() {
787 return state == ChannelState.ACTIVE;
788 }
789
790 @Override
791 public ChannelMetadata metadata() {
792 return METADATA;
793 }
794
795
796
797
798
799 private void flushParent() {
800 if (!inFireChannelReadCompleteQueue) {
801 forceFlushParent();
802 }
803 }
804
805
806
807
808 private void forceFlushParent() {
809 parent().flush();
810 }
811
812 private static long connectionAddressChecked(@Nullable QuicheQuicConnection conn) throws ClosedChannelException {
813 if (conn == null || conn.isFreed()) {
814 throw new ClosedChannelException();
815 }
816 return conn.address();
817 }
818
819 boolean freeIfClosed() {
820 QuicheQuicConnection conn = connection;
821 if (conn == null || conn.isFreed()) {
822 return true;
823 }
824 if (conn.isClosed()) {
825 unsafe().close(newPromise());
826 return true;
827 }
828 return false;
829 }
830
831 private void closeStreams() {
832 if (streams.isEmpty()) {
833 return;
834 }
835 final ClosedChannelException closedChannelException;
836 if (isTimedOut()) {
837
838 closedChannelException = new QuicTimeoutClosedChannelException();
839 } else {
840 closedChannelException = new ClosedChannelException();
841 }
842
843
844 for (QuicheQuicStreamChannel stream: streams.values().toArray(new QuicheQuicStreamChannel[0])) {
845 stream.unsafe().close(closedChannelException, voidPromise());
846 }
847 streams.clear();
848 }
849
850 void streamPriority(long streamId, byte priority, boolean incremental) throws Exception {
851 int res = Quiche.quiche_conn_stream_priority(connectionAddressChecked(connection), streamId,
852 priority, incremental);
853 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
854 throw Quiche.convertToException(res);
855 }
856 }
857
858 void streamClosed(long streamId) {
859 streams.remove(streamId);
860 }
861
862 boolean isStreamLocalCreated(long streamId) {
863 return (streamId & 0x1) == (server ? 1 : 0);
864 }
865
866 QuicStreamType streamType(long streamId) {
867 return (streamId & 0x2) == 0 ? QuicStreamType.BIDIRECTIONAL : QuicStreamType.UNIDIRECTIONAL;
868 }
869
870 void streamShutdown(long streamId, boolean read, boolean write, int err, ChannelPromise promise) {
871 QuicheQuicConnection conn = this.connection;
872 final long connectionAddress;
873 try {
874 connectionAddress = connectionAddressChecked(conn);
875 } catch (ClosedChannelException e) {
876 promise.setFailure(e);
877 return;
878 }
879 int res = 0;
880 if (read) {
881 res |= Quiche.quiche_conn_stream_shutdown(connectionAddress, streamId, Quiche.QUICHE_SHUTDOWN_READ, err);
882 }
883 if (write) {
884 res |= Quiche.quiche_conn_stream_shutdown(connectionAddress, streamId, Quiche.QUICHE_SHUTDOWN_WRITE, err);
885 }
886
887
888
889
890
891 if (connectionSend(conn) != SendResult.NONE) {
892
893 forceFlushParent();
894 }
895 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
896 promise.setFailure(Quiche.convertToException(res));
897 } else {
898 promise.setSuccess();
899 }
900 }
901
902 void streamSendFin(long streamId) throws Exception {
903 QuicheQuicConnection conn = connection;
904 try {
905
906 int res = streamSend0(conn, streamId, Unpooled.EMPTY_BUFFER, true);
907 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
908 throw Quiche.convertToException(res);
909 }
910 } finally {
911
912
913
914
915 if (connectionSend(conn) != SendResult.NONE) {
916 flushParent();
917 }
918 }
919 }
920
921 int streamSend(long streamId, ByteBuf buffer, boolean fin) throws ClosedChannelException {
922 QuicheQuicConnection conn = connection;
923 if (buffer.nioBufferCount() == 1) {
924 return streamSend0(conn, streamId, buffer, fin);
925 }
926 ByteBuffer[] nioBuffers = buffer.nioBuffers();
927 int lastIdx = nioBuffers.length - 1;
928 int res = 0;
929 for (int i = 0; i < lastIdx; i++) {
930 ByteBuffer nioBuffer = nioBuffers[i];
931 while (nioBuffer.hasRemaining()) {
932 int localRes = streamSend(conn, streamId, nioBuffer, false);
933 if (localRes <= 0) {
934 return res;
935 }
936 res += localRes;
937
938 nioBuffer.position(nioBuffer.position() + localRes);
939 }
940 }
941 int localRes = streamSend(conn, streamId, nioBuffers[lastIdx], fin);
942 if (localRes > 0) {
943 res += localRes;
944 }
945 return res;
946 }
947
948 void connectionSendAndFlush() {
949 if (inFireChannelReadCompleteQueue || (reantranceGuard & IN_HANDLE_WRITABLE_STREAMS) != 0) {
950 return;
951 }
952 if (connectionSend(connection) != SendResult.NONE) {
953 flushParent();
954 }
955 }
956
957 private int streamSend0(QuicheQuicConnection conn, long streamId, ByteBuf buffer, boolean fin)
958 throws ClosedChannelException {
959 return Quiche.quiche_conn_stream_send(connectionAddressChecked(conn), streamId,
960 Quiche.readerMemoryAddress(buffer), buffer.readableBytes(), fin);
961 }
962
963 private int streamSend(QuicheQuicConnection conn, long streamId, ByteBuffer buffer, boolean fin)
964 throws ClosedChannelException {
965 return Quiche.quiche_conn_stream_send(connectionAddressChecked(conn), streamId,
966 Quiche.memoryAddressWithPosition(buffer), buffer.remaining(), fin);
967 }
968
969 StreamRecvResult streamRecv(long streamId, ByteBuf buffer) throws Exception {
970 QuicheQuicConnection conn = connection;
971 long connAddr = connectionAddressChecked(conn);
972 if (finBuffer == null) {
973 finBuffer = alloc().directBuffer(1);
974 }
975 if (outErrorCodeBuffer == null) {
976 outErrorCodeBuffer = alloc().directBuffer(8);
977 }
978 outErrorCodeBuffer.setLongLE(0, -1L);
979 int writerIndex = buffer.writerIndex();
980 int recvLen = Quiche.quiche_conn_stream_recv(connAddr, streamId,
981 Quiche.writerMemoryAddress(buffer), buffer.writableBytes(), Quiche.writerMemoryAddress(finBuffer),
982 Quiche.writerMemoryAddress(outErrorCodeBuffer));
983 long errorCode = outErrorCodeBuffer.getLongLE(0);
984 if (recvLen == Quiche.QUICHE_ERR_DONE) {
985 return StreamRecvResult.DONE;
986 } else if (recvLen < 0) {
987 throw Quiche.convertToException(recvLen, errorCode);
988 }
989 buffer.writerIndex(writerIndex + recvLen);
990 return finBuffer.getBoolean(0) ? StreamRecvResult.FIN : StreamRecvResult.OK;
991 }
992
993
994
995
996 void recv(InetSocketAddress sender, InetSocketAddress recipient, ByteBuf buffer) {
997 ((QuicChannelUnsafe) unsafe()).connectionRecv(sender, recipient, buffer);
998 }
999
1000
1001
1002
1003
1004
1005 List<ByteBuffer> retiredSourceConnectionId() {
1006 QuicheQuicConnection connection = this.connection;
1007 if (connection == null || connection.isFreed()) {
1008 return Collections.emptyList();
1009 }
1010 long connAddr = connection.address();
1011 assert connAddr != -1;
1012 List<ByteBuffer> retiredSourceIds = null;
1013 for (;;) {
1014 byte[] retired = Quiche.quiche_conn_retired_scid_next(connAddr);
1015 if (retired == null) {
1016 break;
1017 }
1018 if (retiredSourceIds == null) {
1019 retiredSourceIds = new ArrayList<>();
1020 }
1021 ByteBuffer retiredId = ByteBuffer.wrap(retired);
1022 retiredSourceIds.add(retiredId);
1023 sourceConnectionIds.remove(retiredId);
1024 }
1025 if (retiredSourceIds == null) {
1026 return Collections.emptyList();
1027 }
1028 return retiredSourceIds;
1029 }
1030
1031 List<ByteBuffer> newSourceConnectionIds() {
1032 if (connectionIdAddressGenerator != null && resetTokenGenerator != null) {
1033 QuicheQuicConnection connection = this.connection;
1034 if (connection == null || connection.isFreed()) {
1035 return Collections.emptyList();
1036 }
1037 long connAddr = connection.address();
1038
1039
1040 int left = Quiche.quiche_conn_scids_left(connAddr);
1041 if (left > 0) {
1042 QuicConnectionAddress sourceAddr = connection.sourceId();
1043 if (sourceAddr == null) {
1044 return Collections.emptyList();
1045 }
1046 List<ByteBuffer> generatedIds = new ArrayList<>(left);
1047 boolean sendAndFlush = false;
1048 ByteBuffer key = sourceAddr.id();
1049 ByteBuf connIdBuffer = alloc().directBuffer(key.remaining());
1050
1051 byte[] resetTokenArray = new byte[Quic.RESET_TOKEN_LEN];
1052 try {
1053 do {
1054 ByteBuffer srcId = connectionIdAddressGenerator.newId(key.duplicate(), key.remaining())
1055 .asReadOnlyBuffer();
1056 connIdBuffer.clear();
1057 connIdBuffer.writeBytes(srcId.duplicate());
1058 ByteBuffer resetToken = resetTokenGenerator.newResetToken(srcId.duplicate());
1059 resetToken.get(resetTokenArray);
1060 long result = Quiche.quiche_conn_new_scid(
1061 connAddr, Quiche.memoryAddress(connIdBuffer, 0, connIdBuffer.readableBytes()),
1062 connIdBuffer.readableBytes(), resetTokenArray, false, -1);
1063 if (result < 0) {
1064 break;
1065 }
1066 sendAndFlush = true;
1067 generatedIds.add(srcId.duplicate());
1068 sourceConnectionIds.add(srcId);
1069 } while (--left > 0);
1070 } finally {
1071 connIdBuffer.release();
1072 }
1073
1074 if (sendAndFlush) {
1075 connectionSendAndFlush();
1076 }
1077 return generatedIds;
1078 }
1079 }
1080 return Collections.emptyList();
1081 }
1082
1083 void writable() {
1084 QuicheQuicConnection conn = connection;
1085 SendResult result = connectionSend(conn);
1086 handleWritableStreams(conn);
1087 if (connectionSend(conn) == SendResult.SOME) {
1088 result = SendResult.SOME;
1089 }
1090 if (result == SendResult.SOME) {
1091
1092 forceFlushParent();
1093 }
1094 freeIfClosed();
1095 }
1096
1097 long streamCapacity(long streamId) {
1098 QuicheQuicConnection conn = connection;
1099 if (conn.isClosed()) {
1100 return 0;
1101 }
1102 return Quiche.quiche_conn_stream_capacity(conn.address(), streamId);
1103 }
1104
1105 private boolean handleWritableStreams(QuicheQuicConnection conn) {
1106 if (conn.isFreed()) {
1107 return false;
1108 }
1109 reantranceGuard |= IN_HANDLE_WRITABLE_STREAMS;
1110 try {
1111 long connAddr = conn.address();
1112 boolean mayNeedWrite = false;
1113
1114 if (Quiche.quiche_conn_is_established(connAddr) ||
1115 Quiche.quiche_conn_is_in_early_data(connAddr)) {
1116 long writableIterator = Quiche.quiche_conn_writable(connAddr);
1117
1118 int totalWritable = 0;
1119 try {
1120
1121 for (;;) {
1122 int writable = Quiche.quiche_stream_iter_next(
1123 writableIterator, writableStreams);
1124 for (int i = 0; i < writable; i++) {
1125 long streamId = writableStreams[i];
1126 QuicheQuicStreamChannel streamChannel = streams.get(streamId);
1127 if (streamChannel != null) {
1128 long capacity = Quiche.quiche_conn_stream_capacity(connAddr, streamId);
1129 if (streamChannel.writable(capacity)) {
1130 mayNeedWrite = true;
1131 }
1132 }
1133 }
1134 if (writable > 0) {
1135 totalWritable += writable;
1136 }
1137 if (writable < writableStreams.length) {
1138
1139 break;
1140 }
1141 }
1142 } finally {
1143 Quiche.quiche_stream_iter_free(writableIterator);
1144 }
1145 writableStreams = growIfNeeded(writableStreams, totalWritable);
1146 }
1147 return mayNeedWrite;
1148 } finally {
1149 reantranceGuard &= ~IN_HANDLE_WRITABLE_STREAMS;
1150 }
1151 }
1152
1153
1154
1155
1156
1157
1158 void recvComplete() {
1159 try {
1160 QuicheQuicConnection conn = connection;
1161 if (conn.isFreed()) {
1162
1163 forceFlushParent();
1164 return;
1165 }
1166 fireChannelReadCompleteIfNeeded();
1167
1168
1169
1170 connectionSend(conn);
1171
1172
1173 forceFlushParent();
1174 freeIfClosed();
1175 } finally {
1176 inFireChannelReadCompleteQueue = false;
1177 }
1178 }
1179
1180 private void fireChannelReadCompleteIfNeeded() {
1181 if (fireChannelReadCompletePending) {
1182 fireChannelReadCompletePending = false;
1183 pipeline().fireChannelReadComplete();
1184 }
1185 }
1186
1187 private void fireExceptionEvents(QuicheQuicConnection conn, Throwable cause) {
1188 if (cause instanceof SSLHandshakeException) {
1189 notifyAboutHandshakeCompletionIfNeeded(conn, (SSLHandshakeException) cause);
1190 }
1191 pipeline().fireExceptionCaught(cause);
1192 }
1193
1194 private boolean runTasksDirectly() {
1195 return sslTaskExecutor == null || sslTaskExecutor == ImmediateExecutor.INSTANCE ||
1196 sslTaskExecutor == ImmediateEventExecutor.INSTANCE;
1197 }
1198
1199 private void runAllTaskSend(QuicheQuicConnection conn, Runnable task) {
1200 sslTaskExecutor.execute(decorateTaskSend(conn, task));
1201 }
1202
1203 private void runAll(QuicheQuicConnection conn, Runnable task) {
1204 do {
1205 task.run();
1206 } while ((task = conn.sslTask()) != null);
1207 }
1208
1209 private Runnable decorateTaskSend(QuicheQuicConnection conn, Runnable task) {
1210 return () -> {
1211 try {
1212 runAll(conn, task);
1213 } finally {
1214
1215 eventLoop().execute(() -> {
1216
1217 if (connectionSend(conn) != SendResult.NONE) {
1218 forceFlushParent();
1219 }
1220 freeIfClosed();
1221 });
1222 }
1223 };
1224 }
1225
1226 private SendResult connectionSendSegments(QuicheQuicConnection conn,
1227 SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator) {
1228 if (conn.isClosed()) {
1229 return SendResult.NONE;
1230 }
1231 List<ByteBuf> bufferList = new ArrayList<>(segmentedDatagramPacketAllocator.maxNumSegments());
1232 long connAddr = conn.address();
1233 int maxDatagramSize = Quiche.quiche_conn_max_send_udp_payload_size(connAddr);
1234 SendResult sendResult = SendResult.NONE;
1235 boolean close = false;
1236 try {
1237 for (;;) {
1238 int len = calculateSendBufferLength(connAddr, maxDatagramSize);
1239 ByteBuf out = alloc().directBuffer(len);
1240
1241 ByteBuffer sendInfo = conn.nextSendInfo();
1242 InetSocketAddress sendToAddress = this.remote;
1243
1244 int writerIndex = out.writerIndex();
1245 int written = Quiche.quiche_conn_send(
1246 connAddr, Quiche.writerMemoryAddress(out), out.writableBytes(),
1247 Quiche.memoryAddressWithPosition(sendInfo));
1248 if (written == 0) {
1249 out.release();
1250
1251 continue;
1252 }
1253 final boolean done;
1254 if (written < 0) {
1255 done = true;
1256 if (written != Quiche.QUICHE_ERR_DONE) {
1257 close = Quiche.shouldClose(written);
1258 Exception e = Quiche.convertToException(written);
1259 if (!tryFailConnectPromise(e)) {
1260
1261 fireExceptionEvents(conn, e);
1262 }
1263 }
1264 } else {
1265 done = false;
1266 }
1267 int size = bufferList.size();
1268 if (done) {
1269
1270 out.release();
1271
1272 switch (size) {
1273 case 0:
1274
1275 break;
1276 case 1:
1277
1278 parent().write(new DatagramPacket(bufferList.get(0), sendToAddress));
1279 sendResult = SendResult.SOME;
1280 break;
1281 default:
1282 int segmentSize = segmentSize(bufferList);
1283 ByteBuf compositeBuffer = Unpooled.wrappedBuffer(bufferList.toArray(new ByteBuf[0]));
1284
1285 parent().write(segmentedDatagramPacketAllocator.newPacket(
1286 compositeBuffer, segmentSize, sendToAddress));
1287 sendResult = SendResult.SOME;
1288 break;
1289 }
1290 bufferList.clear();
1291 if (close) {
1292 sendResult = SendResult.CLOSE;
1293 }
1294 return sendResult;
1295 }
1296 out.writerIndex(writerIndex + written);
1297
1298 int segmentSize = -1;
1299 if (conn.isSendInfoChanged()) {
1300
1301 remote = QuicheSendInfo.getToAddress(sendInfo);
1302 local = QuicheSendInfo.getFromAddress(sendInfo);
1303
1304 if (size > 0) {
1305
1306
1307 segmentSize = segmentSize(bufferList);
1308 }
1309 } else if (size > 0) {
1310 int lastReadable = segmentSize(bufferList);
1311
1312
1313 if (lastReadable != out.readableBytes() ||
1314 size == segmentedDatagramPacketAllocator.maxNumSegments()) {
1315 segmentSize = lastReadable;
1316 }
1317 }
1318
1319
1320 if (segmentSize != -1) {
1321 final boolean stop;
1322 if (size == 1) {
1323
1324 stop = writePacket(new DatagramPacket(
1325 bufferList.get(0), sendToAddress), maxDatagramSize, len);
1326 } else {
1327
1328 ByteBuf compositeBuffer = Unpooled.wrappedBuffer(bufferList.toArray(new ByteBuf[0]));
1329 stop = writePacket(segmentedDatagramPacketAllocator.newPacket(
1330 compositeBuffer, segmentSize, sendToAddress), maxDatagramSize, len);
1331 }
1332 bufferList.clear();
1333 sendResult = SendResult.SOME;
1334
1335 if (stop) {
1336
1337
1338
1339 if (out.isReadable()) {
1340 parent().write(new DatagramPacket(out, sendToAddress));
1341 } else {
1342 out.release();
1343 }
1344 if (close) {
1345 sendResult = SendResult.CLOSE;
1346 }
1347 return sendResult;
1348 }
1349 }
1350
1351
1352 out.touch(bufferList);
1353
1354 bufferList.add(out);
1355 }
1356 } finally {
1357
1358 }
1359 }
1360
1361 private static int segmentSize(List<ByteBuf> bufferList) {
1362 assert !bufferList.isEmpty();
1363 int size = bufferList.size();
1364 return bufferList.get(size - 1).readableBytes();
1365 }
1366
1367 private SendResult connectionSendSimple(QuicheQuicConnection conn) {
1368 if (conn.isClosed()) {
1369 return SendResult.NONE;
1370 }
1371 long connAddr = conn.address();
1372 SendResult sendResult = SendResult.NONE;
1373 boolean close = false;
1374 int maxDatagramSize = Quiche.quiche_conn_max_send_udp_payload_size(connAddr);
1375 for (;;) {
1376 ByteBuffer sendInfo = conn.nextSendInfo();
1377
1378 int len = calculateSendBufferLength(connAddr, maxDatagramSize);
1379 ByteBuf out = alloc().directBuffer(len);
1380 int writerIndex = out.writerIndex();
1381
1382 int written = Quiche.quiche_conn_send(
1383 connAddr, Quiche.writerMemoryAddress(out), out.writableBytes(),
1384 Quiche.memoryAddressWithPosition(sendInfo));
1385
1386 if (written == 0) {
1387
1388 out.release();
1389 continue;
1390 }
1391 if (written < 0) {
1392 out.release();
1393 if (written != Quiche.QUICHE_ERR_DONE) {
1394 close = Quiche.shouldClose(written);
1395
1396 Exception e = Quiche.convertToException(written);
1397 if (!tryFailConnectPromise(e)) {
1398 fireExceptionEvents(conn, e);
1399 }
1400 }
1401 break;
1402 }
1403 if (conn.isSendInfoChanged()) {
1404
1405 remote = QuicheSendInfo.getToAddress(sendInfo);
1406 local = QuicheSendInfo.getFromAddress(sendInfo);
1407 }
1408 out.writerIndex(writerIndex + written);
1409 boolean stop = writePacket(new DatagramPacket(out, remote), maxDatagramSize, len);
1410 sendResult = SendResult.SOME;
1411 if (stop) {
1412
1413 break;
1414 }
1415 }
1416 if (close) {
1417 sendResult = SendResult.CLOSE;
1418 }
1419 return sendResult;
1420 }
1421
1422 private boolean writePacket(DatagramPacket packet, int maxDatagramSize, int len) {
1423 ChannelFuture future = parent().write(packet);
1424 if (isSendWindowUsed(maxDatagramSize, len)) {
1425
1426 future.addListener(continueSendingListener);
1427 return true;
1428 }
1429 return false;
1430 }
1431
1432 private static boolean isSendWindowUsed(int maxDatagramSize, int len) {
1433 return len < maxDatagramSize;
1434 }
1435
1436 private static int calculateSendBufferLength(long connAddr, int maxDatagramSize) {
1437 int len = Math.min(maxDatagramSize, Quiche.quiche_conn_send_quantum(connAddr));
1438 if (len <= 0) {
1439
1440
1441
1442
1443 return 8;
1444 }
1445 return len;
1446 }
1447
1448
1449
1450
1451
1452 private SendResult connectionSend(QuicheQuicConnection conn) {
1453 if (conn.isFreed()) {
1454 return SendResult.NONE;
1455 }
1456 if ((reantranceGuard & IN_CONNECTION_SEND) != 0) {
1457
1458 notifyEarlyDataReadyIfNeeded(conn);
1459 return SendResult.NONE;
1460 }
1461
1462 reantranceGuard |= IN_CONNECTION_SEND;
1463 try {
1464 SendResult sendResult;
1465 SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator =
1466 config.getSegmentedDatagramPacketAllocator();
1467 if (segmentedDatagramPacketAllocator.maxNumSegments() > 0) {
1468 sendResult = connectionSendSegments(conn, segmentedDatagramPacketAllocator);
1469 } else {
1470 sendResult = connectionSendSimple(conn);
1471 }
1472
1473
1474
1475 Runnable task = conn.sslTask();
1476 if (task != null) {
1477 if (runTasksDirectly()) {
1478
1479 do {
1480 task.run();
1481
1482 notifyEarlyDataReadyIfNeeded(conn);
1483 } while ((task = conn.sslTask()) != null);
1484
1485
1486
1487 eventLoop().execute(new Runnable() {
1488 @Override
1489 public void run() {
1490
1491 if (connectionSend(conn) != SendResult.NONE) {
1492 forceFlushParent();
1493 }
1494 freeIfClosed();
1495 }
1496 });
1497 } else {
1498 runAllTaskSend(conn, task);
1499 }
1500 } else {
1501
1502 notifyEarlyDataReadyIfNeeded(conn);
1503 }
1504
1505
1506 timeoutHandler.scheduleTimeout();
1507 return sendResult;
1508 } finally {
1509 reantranceGuard &= ~IN_CONNECTION_SEND;
1510 }
1511 }
1512
1513 private final class QuicChannelUnsafe extends AbstractChannel.AbstractUnsafe {
1514
1515 @Override
1516 protected void close(ChannelPromise promise, Throwable cause, ClosedChannelException closeCause) {
1517 final InetSocketAddress sendToAddress = remote;
1518 if (sendToAddress == null) {
1519 super.close(promise, cause, closeCause);
1520 } else {
1521 ChannelPromise p = newPromise();
1522 super.close(p.addListener(f -> {
1523
1524
1525
1526 parent().writeAndFlush(new DatagramPacket(Unpooled.EMPTY_BUFFER, sendToAddress))
1527 .addListener(wf -> {
1528 if (f.isSuccess()) {
1529 promise.setSuccess();
1530 } else {
1531 promise.setFailure(f.cause());
1532 }
1533 });
1534 }), cause, closeCause);
1535 }
1536 }
1537
1538 void connectStream(QuicStreamType type, @Nullable ChannelHandler handler,
1539 Promise<QuicStreamChannel> promise) {
1540 if (!promise.setUncancellable()) {
1541 return;
1542 }
1543 long streamId = idGenerator.nextStreamId(type == QuicStreamType.BIDIRECTIONAL);
1544
1545 try {
1546 int res = streamSend0(connection, streamId, Unpooled.EMPTY_BUFFER, false);
1547 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
1548 throw Quiche.convertToException(res);
1549 }
1550 } catch (Exception e) {
1551 promise.setFailure(e);
1552 return;
1553 }
1554 if (type == QuicStreamType.UNIDIRECTIONAL) {
1555 UNI_STREAMS_LEFT_UPDATER.decrementAndGet(QuicheQuicChannel.this);
1556 } else {
1557 BIDI_STREAMS_LEFT_UPDATER.decrementAndGet(QuicheQuicChannel.this);
1558 }
1559 QuicheQuicStreamChannel streamChannel = addNewStreamChannel(streamId);
1560 if (handler != null) {
1561 streamChannel.pipeline().addLast(handler);
1562 }
1563 eventLoop().register(streamChannel).addListener((ChannelFuture f) -> {
1564 if (f.isSuccess()) {
1565 promise.setSuccess(streamChannel);
1566 } else {
1567 promise.setFailure(f.cause());
1568 streams.remove(streamId);
1569 }
1570 });
1571 }
1572
1573 @Override
1574 public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
1575 assert eventLoop().inEventLoop();
1576 if (!channelPromise.setUncancellable()) {
1577 return;
1578 }
1579 if (server) {
1580 channelPromise.setFailure(new UnsupportedOperationException());
1581 return;
1582 }
1583
1584 if (connectPromise != null) {
1585 channelPromise.setFailure(new ConnectionPendingException());
1586 return;
1587 }
1588
1589 if (remote instanceof QuicConnectionAddress) {
1590 if (!sourceConnectionIds.isEmpty()) {
1591
1592 channelPromise.setFailure(new AlreadyConnectedException());
1593 return;
1594 }
1595
1596 connectLocalAddress = (QuicConnectionAddress) local;
1597 connectRemoteAddress = (QuicConnectionAddress) remote;
1598 connectPromise = channelPromise;
1599
1600
1601 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1602 if (connectTimeoutMillis > 0) {
1603 connectTimeoutFuture = eventLoop().schedule(() -> {
1604 ChannelPromise connectPromise = QuicheQuicChannel.this.connectPromise;
1605 if (connectPromise != null && !connectPromise.isDone()
1606 && connectPromise.tryFailure(new ConnectTimeoutException(
1607 "connection timed out: " + local + " -> " + remote))) {
1608 close(voidPromise());
1609 }
1610 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1611 }
1612
1613 connectPromise.addListener((ChannelFuture future) -> {
1614 if (future.isCancelled()) {
1615 if (connectTimeoutFuture != null) {
1616 connectTimeoutFuture.cancel(false);
1617 }
1618 connectPromise = null;
1619 close(voidPromise());
1620 }
1621 });
1622
1623 parent().connect(new QuicheQuicChannelAddress(QuicheQuicChannel.this))
1624 .addListener(f -> {
1625 ChannelPromise connectPromise = QuicheQuicChannel.this.connectPromise;
1626 if (connectPromise != null && !f.isSuccess()) {
1627 connectPromise.tryFailure(f.cause());
1628
1629 unsafe().closeForcibly();
1630 }
1631 });
1632 return;
1633 }
1634
1635 channelPromise.setFailure(new UnsupportedOperationException());
1636 }
1637
1638 private void fireConnectCloseEventIfNeeded(QuicheQuicConnection conn) {
1639 if (connectionCloseEvent == null && !conn.isFreed()) {
1640 connectionCloseEvent = Quiche.quiche_conn_peer_error(conn.address());
1641 if (connectionCloseEvent != null) {
1642 pipeline().fireUserEventTriggered(connectionCloseEvent);
1643 }
1644 }
1645 }
1646
1647 void connectionRecv(InetSocketAddress sender, InetSocketAddress recipient, ByteBuf buffer) {
1648 QuicheQuicConnection conn = QuicheQuicChannel.this.connection;
1649 if (conn.isFreed()) {
1650 return;
1651 }
1652 int bufferReadable = buffer.readableBytes();
1653 if (bufferReadable == 0) {
1654
1655
1656 return;
1657 }
1658
1659 reantranceGuard |= IN_RECV;
1660 boolean close = false;
1661 try {
1662 ByteBuf tmpBuffer = null;
1663
1664
1665 if (buffer.isReadOnly()) {
1666 tmpBuffer = alloc().directBuffer(buffer.readableBytes());
1667 tmpBuffer.writeBytes(buffer);
1668 buffer = tmpBuffer;
1669 }
1670 long memoryAddress = Quiche.readerMemoryAddress(buffer);
1671
1672 ByteBuffer recvInfo = conn.nextRecvInfo();
1673 QuicheRecvInfo.setRecvInfo(recvInfo, sender, recipient);
1674
1675 remote = sender;
1676 local = recipient;
1677
1678 try {
1679 do {
1680
1681 int res = Quiche.quiche_conn_recv(conn.address(), memoryAddress, bufferReadable,
1682 Quiche.memoryAddressWithPosition(recvInfo));
1683 final boolean done;
1684 if (res < 0) {
1685 done = true;
1686 if (res != Quiche.QUICHE_ERR_DONE) {
1687 close = Quiche.shouldClose(res);
1688 Exception e = Quiche.convertToException(res);
1689 if (tryFailConnectPromise(e)) {
1690 break;
1691 }
1692 fireExceptionEvents(conn, e);
1693 }
1694 } else {
1695 done = false;
1696 }
1697
1698 Runnable task = conn.sslTask();
1699 if (task != null) {
1700 if (runTasksDirectly()) {
1701
1702 do {
1703 task.run();
1704 } while ((task = conn.sslTask()) != null);
1705 processReceived(conn);
1706 } else {
1707 runAllTaskRecv(conn, task);
1708 }
1709 } else {
1710 processReceived(conn);
1711 }
1712
1713 if (done) {
1714 break;
1715 }
1716 memoryAddress += res;
1717 bufferReadable -= res;
1718 } while (bufferReadable > 0 && !conn.isFreed());
1719 } finally {
1720 buffer.skipBytes((int) (memoryAddress - Quiche.readerMemoryAddress(buffer)));
1721 if (tmpBuffer != null) {
1722 tmpBuffer.release();
1723 }
1724 }
1725 if (close) {
1726
1727 unsafe().close(newPromise());
1728 }
1729 } finally {
1730 reantranceGuard &= ~IN_RECV;
1731 }
1732 }
1733
1734 private void processReceived(QuicheQuicConnection conn) {
1735
1736 if (handlePendingChannelActive(conn)) {
1737
1738 return;
1739 }
1740
1741 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1742 fireConnectCloseEventIfNeeded(conn);
1743
1744 if (conn.isFreed()) {
1745 return;
1746 }
1747
1748 long connAddr = conn.address();
1749 if (Quiche.quiche_conn_is_established(connAddr) ||
1750 Quiche.quiche_conn_is_in_early_data(connAddr)) {
1751 long uniLeftOld = uniStreamsLeft;
1752 long bidiLeftOld = bidiStreamsLeft;
1753
1754 if (uniLeftOld == 0 || bidiLeftOld == 0) {
1755 long uniLeft = Quiche.quiche_conn_peer_streams_left_uni(connAddr);
1756 long bidiLeft = Quiche.quiche_conn_peer_streams_left_bidi(connAddr);
1757 uniStreamsLeft = uniLeft;
1758 bidiStreamsLeft = bidiLeft;
1759 if (uniLeftOld != uniLeft || bidiLeftOld != bidiLeft) {
1760 pipeline().fireUserEventTriggered(QuicStreamLimitChangedEvent.INSTANCE);
1761 }
1762 }
1763
1764 handlePathEvents(conn);
1765
1766 if (handleWritableStreams(conn)) {
1767
1768 flushParent();
1769 }
1770
1771 datagramReadable = true;
1772 streamReadable = true;
1773
1774 recvDatagram(conn);
1775 recvStream(conn);
1776 }
1777 }
1778
1779 private void handlePathEvents(QuicheQuicConnection conn) {
1780 long event;
1781 while (!conn.isFreed() && (event = Quiche.quiche_conn_path_event_next(conn.address())) > 0) {
1782 try {
1783 int type = Quiche.quiche_path_event_type(event);
1784
1785 if (type == Quiche.QUICHE_PATH_EVENT_NEW) {
1786 Object[] ret = Quiche.quiche_path_event_new(event);
1787 InetSocketAddress local = (InetSocketAddress) ret[0];
1788 InetSocketAddress peer = (InetSocketAddress) ret[1];
1789 pipeline().fireUserEventTriggered(new QuicPathEvent.New(local, peer));
1790 } else if (type == Quiche.QUICHE_PATH_EVENT_VALIDATED) {
1791 Object[] ret = Quiche.quiche_path_event_validated(event);
1792 InetSocketAddress local = (InetSocketAddress) ret[0];
1793 InetSocketAddress peer = (InetSocketAddress) ret[1];
1794 pipeline().fireUserEventTriggered(new QuicPathEvent.Validated(local, peer));
1795 } else if (type == Quiche.QUICHE_PATH_EVENT_FAILED_VALIDATION) {
1796 Object[] ret = Quiche.quiche_path_event_failed_validation(event);
1797 InetSocketAddress local = (InetSocketAddress) ret[0];
1798 InetSocketAddress peer = (InetSocketAddress) ret[1];
1799 pipeline().fireUserEventTriggered(new QuicPathEvent.FailedValidation(local, peer));
1800 } else if (type == Quiche.QUICHE_PATH_EVENT_CLOSED) {
1801 Object[] ret = Quiche.quiche_path_event_closed(event);
1802 InetSocketAddress local = (InetSocketAddress) ret[0];
1803 InetSocketAddress peer = (InetSocketAddress) ret[1];
1804 pipeline().fireUserEventTriggered(new QuicPathEvent.Closed(local, peer));
1805 } else if (type == Quiche.QUICHE_PATH_EVENT_REUSED_SOURCE_CONNECTION_ID) {
1806 Object[] ret = Quiche.quiche_path_event_reused_source_connection_id(event);
1807 Long seq = (Long) ret[0];
1808 InetSocketAddress localOld = (InetSocketAddress) ret[1];
1809 InetSocketAddress peerOld = (InetSocketAddress) ret[2];
1810 InetSocketAddress local = (InetSocketAddress) ret[3];
1811 InetSocketAddress peer = (InetSocketAddress) ret[4];
1812 pipeline().fireUserEventTriggered(
1813 new QuicPathEvent.ReusedSourceConnectionId(seq, localOld, peerOld, local, peer));
1814 } else if (type == Quiche.QUICHE_PATH_EVENT_PEER_MIGRATED) {
1815 Object[] ret = Quiche.quiche_path_event_peer_migrated(event);
1816 InetSocketAddress local = (InetSocketAddress) ret[0];
1817 InetSocketAddress peer = (InetSocketAddress) ret[1];
1818 pipeline().fireUserEventTriggered(new QuicPathEvent.PeerMigrated(local, peer));
1819 }
1820 } finally {
1821 Quiche.quiche_path_event_free(event);
1822 }
1823 }
1824 }
1825
1826 private void runAllTaskRecv(QuicheQuicConnection conn, Runnable task) {
1827 sslTaskExecutor.execute(decorateTaskRecv(conn, task));
1828 }
1829
1830 private Runnable decorateTaskRecv(QuicheQuicConnection conn, Runnable task) {
1831 return () -> {
1832 try {
1833 runAll(conn, task);
1834 } finally {
1835
1836 eventLoop().execute(() -> {
1837 if (!conn.isFreed()) {
1838 processReceived(conn);
1839
1840
1841 if (connectionSend(conn) != SendResult.NONE) {
1842 forceFlushParent();
1843 }
1844
1845 freeIfClosed();
1846 }
1847 });
1848 }
1849 };
1850 }
1851 void recv() {
1852 QuicheQuicConnection conn = connection;
1853 if ((reantranceGuard & IN_RECV) != 0 || conn.isFreed()) {
1854 return;
1855 }
1856
1857 long connAddr = conn.address();
1858
1859 if (!Quiche.quiche_conn_is_established(connAddr) &&
1860 !Quiche.quiche_conn_is_in_early_data(connAddr)) {
1861 return;
1862 }
1863
1864 reantranceGuard |= IN_RECV;
1865 try {
1866 recvDatagram(conn);
1867 recvStream(conn);
1868 } finally {
1869 fireChannelReadCompleteIfNeeded();
1870 reantranceGuard &= ~IN_RECV;
1871 }
1872 }
1873
1874 private void recvStream(QuicheQuicConnection conn) {
1875 if (conn.isFreed()) {
1876 return;
1877 }
1878 long connAddr = conn.address();
1879 long readableIterator = Quiche.quiche_conn_readable(connAddr);
1880 int totalReadable = 0;
1881 if (readableIterator != -1) {
1882 try {
1883
1884 if (recvStreamPending && streamReadable) {
1885 for (;;) {
1886 int readable = Quiche.quiche_stream_iter_next(
1887 readableIterator, readableStreams);
1888 for (int i = 0; i < readable; i++) {
1889 long streamId = readableStreams[i];
1890 QuicheQuicStreamChannel streamChannel = streams.get(streamId);
1891 if (streamChannel == null) {
1892 recvStreamPending = false;
1893 fireChannelReadCompletePending = true;
1894 streamChannel = addNewStreamChannel(streamId);
1895 streamChannel.readable();
1896 pipeline().fireChannelRead(streamChannel);
1897 } else {
1898 streamChannel.readable();
1899 }
1900 }
1901 if (readable < readableStreams.length) {
1902
1903 streamReadable = false;
1904 break;
1905 }
1906 if (readable > 0) {
1907 totalReadable += readable;
1908 }
1909 }
1910 }
1911 } finally {
1912 Quiche.quiche_stream_iter_free(readableIterator);
1913 }
1914 readableStreams = growIfNeeded(readableStreams, totalReadable);
1915 }
1916 }
1917
1918 private void recvDatagram(QuicheQuicConnection conn) {
1919 if (!supportsDatagram) {
1920 return;
1921 }
1922 while (recvDatagramPending && datagramReadable && !conn.isFreed()) {
1923 @SuppressWarnings("deprecation")
1924 RecvByteBufAllocator.Handle recvHandle = recvBufAllocHandle();
1925 recvHandle.reset(config());
1926
1927 int numMessagesRead = 0;
1928 do {
1929 long connAddr = conn.address();
1930 int len = Quiche.quiche_conn_dgram_recv_front_len(connAddr);
1931 if (len == Quiche.QUICHE_ERR_DONE) {
1932 datagramReadable = false;
1933 return;
1934 }
1935
1936 ByteBuf datagramBuffer = alloc().directBuffer(len);
1937 recvHandle.attemptedBytesRead(datagramBuffer.writableBytes());
1938 int writerIndex = datagramBuffer.writerIndex();
1939 long memoryAddress = Quiche.writerMemoryAddress(datagramBuffer);
1940
1941 int written = Quiche.quiche_conn_dgram_recv(connAddr,
1942 memoryAddress, datagramBuffer.writableBytes());
1943 if (written < 0) {
1944 datagramBuffer.release();
1945 if (written == Quiche.QUICHE_ERR_DONE) {
1946
1947 datagramReadable = false;
1948 break;
1949 }
1950 pipeline().fireExceptionCaught(Quiche.convertToException(written));
1951 }
1952 recvHandle.lastBytesRead(written);
1953 recvHandle.incMessagesRead(1);
1954 numMessagesRead++;
1955 datagramBuffer.writerIndex(writerIndex + written);
1956 recvDatagramPending = false;
1957 fireChannelReadCompletePending = true;
1958
1959 pipeline().fireChannelRead(datagramBuffer);
1960 } while (recvHandle.continueReading() && !conn.isFreed());
1961 recvHandle.readComplete();
1962
1963
1964 if (numMessagesRead > 0) {
1965 fireChannelReadCompleteIfNeeded();
1966 }
1967 }
1968 }
1969
1970 private boolean handlePendingChannelActive(QuicheQuicConnection conn) {
1971 if (conn.isFreed() || state == ChannelState.CLOSED) {
1972 return true;
1973 }
1974 if (server) {
1975 if (state == ChannelState.OPEN && Quiche.quiche_conn_is_established(conn.address())) {
1976
1977 state = ChannelState.ACTIVE;
1978
1979 fireDatagramExtensionEvent(conn);
1980 pipeline().fireChannelActive();
1981 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1982 }
1983 } else if (connectPromise != null && Quiche.quiche_conn_is_established(conn.address())) {
1984 ChannelPromise promise = connectPromise;
1985 connectPromise = null;
1986 state = ChannelState.ACTIVE;
1987
1988 boolean promiseSet = promise.trySuccess();
1989 fireDatagramExtensionEvent(conn);
1990 pipeline().fireChannelActive();
1991 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1992 if (!promiseSet) {
1993 fireConnectCloseEventIfNeeded(conn);
1994 this.close(this.voidPromise());
1995 return true;
1996 }
1997 }
1998 return false;
1999 }
2000
2001 private void fireDatagramExtensionEvent(QuicheQuicConnection conn) {
2002 if (conn.isClosed()) {
2003 return;
2004 }
2005 long connAddr = conn.address();
2006 int len = Quiche.quiche_conn_dgram_max_writable_len(connAddr);
2007
2008 if (len != Quiche.QUICHE_ERR_DONE) {
2009 pipeline().fireUserEventTriggered(new QuicDatagramExtensionEvent(len));
2010 }
2011 }
2012
2013 private QuicheQuicStreamChannel addNewStreamChannel(long streamId) {
2014 QuicheQuicStreamChannel streamChannel = new QuicheQuicStreamChannel(
2015 QuicheQuicChannel.this, streamId);
2016 QuicheQuicStreamChannel old = streams.put(streamId, streamChannel);
2017 assert old == null;
2018 streamChannel.writable(streamCapacity(streamId));
2019 return streamChannel;
2020 }
2021 }
2022
2023
2024
2025
2026 void finishConnect() {
2027 assert !server;
2028 assert connection != null;
2029 if (connectionSend(connection) != SendResult.NONE) {
2030 flushParent();
2031 }
2032 }
2033
2034 private void notifyEarlyDataReadyIfNeeded(QuicheQuicConnection conn) {
2035 if (!server && !earlyDataReadyNotified &&
2036 !conn.isFreed() && Quiche.quiche_conn_is_in_early_data(conn.address())) {
2037 earlyDataReadyNotified = true;
2038 pipeline().fireUserEventTriggered(SslEarlyDataReadyEvent.INSTANCE);
2039 }
2040 }
2041
2042 private final class TimeoutHandler implements Runnable {
2043 private ScheduledFuture<?> timeoutFuture;
2044
2045 @Override
2046 public void run() {
2047 QuicheQuicConnection conn = connection;
2048 if (conn.isFreed()) {
2049 return;
2050 }
2051 if (!freeIfClosed()) {
2052 long connAddr = conn.address();
2053 timeoutFuture = null;
2054
2055 Quiche.quiche_conn_on_timeout(connAddr);
2056 if (!freeIfClosed()) {
2057
2058
2059 if (connectionSend(conn) != SendResult.NONE) {
2060 flushParent();
2061 }
2062 boolean closed = freeIfClosed();
2063 if (!closed) {
2064
2065 scheduleTimeout();
2066 }
2067 }
2068 }
2069 }
2070
2071
2072
2073 void scheduleTimeout() {
2074 QuicheQuicConnection conn = connection;
2075 if (conn.isFreed()) {
2076 cancel();
2077 return;
2078 }
2079 if (conn.isClosed()) {
2080 cancel();
2081 unsafe().close(newPromise());
2082 return;
2083 }
2084 long nanos = Quiche.quiche_conn_timeout_as_nanos(conn.address());
2085 if (nanos < 0 || nanos == Long.MAX_VALUE) {
2086
2087 cancel();
2088 return;
2089 }
2090 if (timeoutFuture == null) {
2091 timeoutFuture = eventLoop().schedule(this,
2092 nanos, TimeUnit.NANOSECONDS);
2093 } else {
2094 long remaining = timeoutFuture.getDelay(TimeUnit.NANOSECONDS);
2095 if (remaining <= 0) {
2096
2097
2098 cancel();
2099 run();
2100 } else if (remaining > nanos) {
2101
2102
2103 cancel();
2104 timeoutFuture = eventLoop().schedule(this, nanos, TimeUnit.NANOSECONDS);
2105 }
2106 }
2107 }
2108
2109 void cancel() {
2110 if (timeoutFuture != null) {
2111 timeoutFuture.cancel(false);
2112 timeoutFuture = null;
2113 }
2114 }
2115 }
2116
2117 @Override
2118 public Future<QuicConnectionStats> collectStats(Promise<QuicConnectionStats> promise) {
2119 if (eventLoop().inEventLoop()) {
2120 collectStats0(promise);
2121 } else {
2122 eventLoop().execute(() -> collectStats0(promise));
2123 }
2124 return promise;
2125 }
2126
2127 private void collectStats0(Promise<QuicConnectionStats> promise) {
2128 QuicheQuicConnection conn = connection;
2129 if (conn.isFreed()) {
2130 promise.setSuccess(statsAtClose);
2131 return;
2132 }
2133
2134 collectStats0(connection, promise);
2135 }
2136
2137 @Nullable
2138 private QuicConnectionStats collectStats0(QuicheQuicConnection connection, Promise<QuicConnectionStats> promise) {
2139 final long[] stats = Quiche.quiche_conn_stats(connection.address());
2140 if (stats == null) {
2141 promise.setFailure(new IllegalStateException("native quiche_conn_stats(...) failed"));
2142 return null;
2143 }
2144
2145 final QuicheQuicConnectionStats connStats =
2146 new QuicheQuicConnectionStats(stats);
2147 promise.setSuccess(connStats);
2148 return connStats;
2149 }
2150
2151 @Override
2152 public Future<QuicConnectionPathStats> collectPathStats(int pathIdx, Promise<QuicConnectionPathStats> promise) {
2153 if (eventLoop().inEventLoop()) {
2154 collectPathStats0(pathIdx, promise);
2155 } else {
2156 eventLoop().execute(() -> collectPathStats0(pathIdx, promise));
2157 }
2158 return promise;
2159 }
2160
2161 private void collectPathStats0(int pathIdx, Promise<QuicConnectionPathStats> promise) {
2162 QuicheQuicConnection conn = connection;
2163 if (conn.isFreed()) {
2164 promise.setFailure(new IllegalStateException("Connection is closed"));
2165 return;
2166 }
2167
2168 final Object[] stats = Quiche.quiche_conn_path_stats(connection.address(), pathIdx);
2169 if (stats == null) {
2170 promise.setFailure(new IllegalStateException("native quiche_conn_path_stats(...) failed"));
2171 return;
2172 }
2173 promise.setSuccess(new QuicheQuicConnectionPathStats(stats));
2174 }
2175
2176 @Override
2177 public QuicTransportParameters peerTransportParameters() {
2178 return connection.peerParameters();
2179 }
2180 }