View Javadoc
1   /*
2    * Copyright 2020 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.quic;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.AbstractChannel;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandler;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelMetadata;
27  import io.netty.channel.ChannelOption;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPipeline;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.DefaultChannelPipeline;
33  import io.netty.channel.EventLoop;
34  import io.netty.channel.RecvByteBufAllocator;
35  import io.netty.channel.socket.DatagramPacket;
36  import io.netty.handler.ssl.SniCompletionEvent;
37  import io.netty.handler.ssl.SslHandshakeCompletionEvent;
38  import io.netty.util.AttributeKey;
39  import io.netty.util.collection.LongObjectHashMap;
40  import io.netty.util.collection.LongObjectMap;
41  import io.netty.util.concurrent.Future;
42  import io.netty.util.concurrent.ImmediateEventExecutor;
43  import io.netty.util.concurrent.ImmediateExecutor;
44  import io.netty.util.concurrent.Promise;
45  import io.netty.util.internal.StringUtil;
46  import io.netty.util.internal.logging.InternalLogger;
47  import io.netty.util.internal.logging.InternalLoggerFactory;
48  import org.jetbrains.annotations.Nullable;
49  
50  import javax.net.ssl.SSLEngine;
51  import javax.net.ssl.SSLHandshakeException;
52  import java.io.File;
53  import java.net.ConnectException;
54  import java.net.InetSocketAddress;
55  import java.net.SocketAddress;
56  import java.nio.BufferUnderflowException;
57  import java.nio.ByteBuffer;
58  import java.nio.channels.AlreadyConnectedException;
59  import java.nio.channels.ClosedChannelException;
60  import java.nio.channels.ConnectionPendingException;
61  import java.util.ArrayList;
62  import java.util.Collections;
63  import java.util.HashSet;
64  import java.util.List;
65  import java.util.Map;
66  import java.util.Set;
67  import java.util.concurrent.Executor;
68  import java.util.concurrent.ScheduledFuture;
69  import java.util.concurrent.TimeUnit;
70  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
71  import java.util.function.Consumer;
72  import java.util.function.Function;
73  
74  /**
75   * {@link QuicChannel} implementation that uses <a href="https://github.com/cloudflare/quiche">quiche</a>.
76   */
77  final class QuicheQuicChannel extends AbstractChannel implements QuicChannel {
78      private static final InternalLogger logger = InternalLoggerFactory.getInstance(QuicheQuicChannel.class);
79      private static final String QLOG_FILE_EXTENSION = ".qlog";
80  
81      enum StreamRecvResult {
82          /**
83           * Nothing more to read from the stream.
84           */
85          DONE,
86          /**
87           * FIN flag received.
88           */
89          FIN,
90          /**
91           * Normal read without FIN flag.
92           */
93          OK
94      }
95  
96      private enum ChannelState {
97          OPEN,
98          ACTIVE,
99          CLOSED
100     }
101 
102     private enum SendResult {
103         SOME,
104         NONE,
105         CLOSE
106     }
107 
108     private static final class CloseData implements ChannelFutureListener {
109         final boolean applicationClose;
110         final int err;
111         final ByteBuf reason;
112 
113         CloseData(boolean applicationClose, int err, ByteBuf reason) {
114             this.applicationClose = applicationClose;
115             this.err = err;
116             this.reason = reason;
117         }
118 
119         @Override
120         public void operationComplete(ChannelFuture future) {
121             reason.release();
122         }
123     }
124 
125     private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
126     private long[] readableStreams = new long[4];
127     private long[] writableStreams = new long[4];
128     private final LongObjectMap<QuicheQuicStreamChannel> streams = new LongObjectHashMap<>();
129     private final QuicheQuicChannelConfig config;
130     private final boolean server;
131     private final QuicStreamIdGenerator idGenerator;
132     private final ChannelHandler streamHandler;
133     private final Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray;
134     private final Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray;
135     private final TimeoutHandler timeoutHandler;
136     private final QuicConnectionIdGenerator connectionIdAddressGenerator;
137     private final QuicResetTokenGenerator resetTokenGenerator;
138     private final Set<ByteBuffer> sourceConnectionIds = new HashSet<>();
139 
140     private Consumer<QuicheQuicChannel> freeTask;
141     private Executor sslTaskExecutor;
142     private boolean inFireChannelReadCompleteQueue;
143     private boolean fireChannelReadCompletePending;
144     private ByteBuf finBuffer;
145     private ChannelPromise connectPromise;
146     private ScheduledFuture<?> connectTimeoutFuture;
147     private QuicConnectionAddress connectAddress;
148     private CloseData closeData;
149     private QuicConnectionCloseEvent connectionCloseEvent;
150     private QuicConnectionStats statsAtClose;
151     private boolean supportsDatagram;
152     private boolean recvDatagramPending;
153     private boolean datagramReadable;
154     private boolean recvStreamPending;
155     private boolean streamReadable;
156     private boolean handshakeCompletionNotified;
157     private boolean earlyDataReadyNotified;
158     private int reantranceGuard;
159     private static final int IN_RECV = 1 << 1;
160     private static final int IN_CONNECTION_SEND = 1 << 2;
161     private static final int IN_HANDLE_WRITABLE_STREAMS = 1 << 3;
162     private volatile ChannelState state = ChannelState.OPEN;
163     private volatile boolean timedOut;
164     private volatile String traceId;
165     private volatile QuicheQuicConnection connection;
166     private volatile InetSocketAddress local;
167     private volatile InetSocketAddress remote;
168 
169     private final ChannelFutureListener continueSendingListener = f -> {
170         if (connectionSend(connection) != SendResult.NONE) {
171             flushParent();
172         }
173     };
174 
175     private static final AtomicLongFieldUpdater<QuicheQuicChannel> UNI_STREAMS_LEFT_UPDATER =
176             AtomicLongFieldUpdater.newUpdater(QuicheQuicChannel.class, "uniStreamsLeft");
177     private volatile long uniStreamsLeft;
178 
179     private static final AtomicLongFieldUpdater<QuicheQuicChannel> BIDI_STREAMS_LEFT_UPDATER =
180             AtomicLongFieldUpdater.newUpdater(QuicheQuicChannel.class, "bidiStreamsLeft");
181     private volatile long bidiStreamsLeft;
182 
183     private QuicheQuicChannel(Channel parent, boolean server, @Nullable ByteBuffer key, InetSocketAddress local,
184                               InetSocketAddress remote, boolean supportsDatagram, ChannelHandler streamHandler,
185                               Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
186                               Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray,
187                               @Nullable Consumer<QuicheQuicChannel> freeTask,
188                               @Nullable Executor sslTaskExecutor,
189                               @Nullable QuicConnectionIdGenerator connectionIdAddressGenerator,
190                               @Nullable QuicResetTokenGenerator resetTokenGenerator) {
191         super(parent);
192         config = new QuicheQuicChannelConfig(this);
193         this.freeTask = freeTask;
194         this.server = server;
195         this.idGenerator = new QuicStreamIdGenerator(server);
196         this.connectionIdAddressGenerator = connectionIdAddressGenerator;
197         this.resetTokenGenerator = resetTokenGenerator;
198         if (key != null) {
199             this.sourceConnectionIds.add(key);
200         }
201 
202         this.supportsDatagram = supportsDatagram;
203         this.local = local;
204         this.remote = remote;
205 
206         this.streamHandler = streamHandler;
207         this.streamOptionsArray = streamOptionsArray;
208         this.streamAttrsArray = streamAttrsArray;
209         timeoutHandler = new TimeoutHandler();
210         this.sslTaskExecutor = sslTaskExecutor == null ? ImmediateExecutor.INSTANCE : sslTaskExecutor;
211     }
212 
213     static QuicheQuicChannel forClient(Channel parent, InetSocketAddress local, InetSocketAddress remote,
214                                        ChannelHandler streamHandler,
215                                        Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
216                                        Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray) {
217         return new QuicheQuicChannel(parent, false, null, local, remote, false, streamHandler,
218                 streamOptionsArray, streamAttrsArray, null, null, null, null);
219     }
220 
221     static QuicheQuicChannel forServer(Channel parent, ByteBuffer key, InetSocketAddress local,
222                                        InetSocketAddress remote,
223                                        boolean supportsDatagram, ChannelHandler streamHandler,
224                                        Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
225                                        Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray,
226                                        Consumer<QuicheQuicChannel> freeTask, Executor sslTaskExecutor,
227                                        QuicConnectionIdGenerator connectionIdAddressGenerator,
228                                        QuicResetTokenGenerator resetTokenGenerator) {
229         return new QuicheQuicChannel(parent, true, key, local, remote, supportsDatagram,
230                 streamHandler, streamOptionsArray, streamAttrsArray, freeTask,
231                 sslTaskExecutor, connectionIdAddressGenerator, resetTokenGenerator);
232     }
233 
234     private static final int MAX_ARRAY_LEN = 128;
235 
236     private static long[] growIfNeeded(long[] array, int maxLength) {
237         if (maxLength > array.length) {
238             if (array.length == MAX_ARRAY_LEN) {
239                 return array;
240             }
241             // Increase by 4 until we reach MAX_ARRAY_LEN
242             return new long[Math.min(MAX_ARRAY_LEN, array.length + 4)];
243         }
244         return array;
245     }
246 
247     @Override
248     public boolean isTimedOut() {
249         return timedOut;
250     }
251 
252     @Override
253     public SSLEngine sslEngine() {
254         QuicheQuicConnection connection = this.connection;
255         return connection == null ? null : connection.engine();
256     }
257 
258     private void notifyAboutHandshakeCompletionIfNeeded(QuicheQuicConnection conn,
259                                                         @Nullable SSLHandshakeException cause) {
260         if (handshakeCompletionNotified) {
261             return;
262         }
263         if (cause != null) {
264             pipeline().fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
265             return;
266         }
267         if (conn.isFreed()) {
268             return;
269         }
270         switch (connection.engine().getHandshakeStatus()) {
271             case NOT_HANDSHAKING:
272             case FINISHED:
273                 handshakeCompletionNotified = true;
274                 pipeline().fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
275                 break;
276             default:
277                 break;
278         }
279     }
280 
281     @Override
282     public long peerAllowedStreams(QuicStreamType type) {
283         switch (type) {
284             case BIDIRECTIONAL:
285                 return bidiStreamsLeft;
286             case UNIDIRECTIONAL:
287                 return uniStreamsLeft;
288             default:
289                 return 0;
290         }
291     }
292 
293     void attachQuicheConnection(QuicheQuicConnection connection) {
294         this.connection = connection;
295 
296         byte[] traceId = Quiche.quiche_conn_trace_id(connection.address());
297         if (traceId != null) {
298             this.traceId = new String(traceId);
299         }
300 
301         connection.init(local, remote,
302                 sniHostname -> pipeline().fireUserEventTriggered(new SniCompletionEvent(sniHostname)));
303 
304         // Setup QLOG if needed.
305         QLogConfiguration configuration = config.getQLogConfiguration();
306         if (configuration != null) {
307             final String fileName;
308             File file = new File(configuration.path());
309             if (file.isDirectory()) {
310                 // Create directory if needed.
311                 file.mkdir();
312                 if (this.traceId != null) {
313                     fileName = configuration.path() + File.separatorChar + this.traceId + "-" +
314                             id().asShortText() + QLOG_FILE_EXTENSION;
315                 } else {
316                     fileName = configuration.path() + File.separatorChar + id().asShortText() + QLOG_FILE_EXTENSION;
317                 }
318             } else {
319                 fileName = configuration.path();
320             }
321 
322             if (!Quiche.quiche_conn_set_qlog_path(connection.address(), fileName,
323                     configuration.logTitle(), configuration.logDescription())) {
324                 logger.info("Unable to create qlog file: {} ", fileName);
325             }
326         }
327     }
328 
329     void connectNow(Function<QuicChannel, ? extends QuicSslEngine> engineProvider, Executor sslTaskExecutor,
330                     Consumer<QuicheQuicChannel> freeTask, long configAddr, int localConnIdLength,
331                     boolean supportsDatagram, ByteBuffer fromSockaddrMemory, ByteBuffer toSockaddrMemory)
332             throws Exception {
333         assert this.connection == null;
334         assert this.traceId == null;
335         assert this.sourceConnectionIds.isEmpty();
336 
337         this.sslTaskExecutor = sslTaskExecutor;
338         this.freeTask = freeTask;
339 
340         QuicConnectionAddress address = this.connectAddress;
341 
342         if (address == QuicConnectionAddress.EPHEMERAL) {
343             address = QuicConnectionAddress.random(localConnIdLength);
344         }
345         ByteBuffer connectId = address.id();
346         if (connectId.remaining() != localConnIdLength) {
347             failConnectPromiseAndThrow(new IllegalArgumentException("connectionAddress has length "
348                     + connectId.remaining()
349                     + " instead of " + localConnIdLength));
350         }
351         QuicSslEngine engine = engineProvider.apply(this);
352         if (!(engine instanceof QuicheQuicSslEngine)) {
353             failConnectPromiseAndThrow(new IllegalArgumentException("QuicSslEngine is not of type "
354                     + QuicheQuicSslEngine.class.getSimpleName()));
355             return;
356         }
357         if (!engine.getUseClientMode()) {
358             failConnectPromiseAndThrow(new IllegalArgumentException("QuicSslEngine is not create in client mode"));
359         }
360         QuicheQuicSslEngine quicheEngine = (QuicheQuicSslEngine) engine;
361         ByteBuf idBuffer = alloc().directBuffer(connectId.remaining()).writeBytes(connectId.duplicate());
362         try {
363             int fromSockaddrLen = SockaddrIn.setAddress(fromSockaddrMemory, local);
364             int toSockaddrLen = SockaddrIn.setAddress(toSockaddrMemory, remote);
365             QuicheQuicConnection connection = quicheEngine.createConnection(ssl ->
366                     Quiche.quiche_conn_new_with_tls(Quiche.readerMemoryAddress(idBuffer),
367                             idBuffer.readableBytes(), -1, -1,
368                             Quiche.memoryAddressWithPosition(fromSockaddrMemory), fromSockaddrLen,
369                             Quiche.memoryAddressWithPosition(toSockaddrMemory), toSockaddrLen,
370                             configAddr, ssl, false));
371             if (connection == null) {
372                 failConnectPromiseAndThrow(new ConnectException());
373                 return;
374             }
375             attachQuicheConnection(connection);
376             QuicClientSessionCache sessionCache = quicheEngine.ctx.getSessionCache();
377             if (sessionCache != null) {
378                 byte[] sessionBytes = sessionCache
379                         .getSession(quicheEngine.getSession().getPeerHost(), quicheEngine.getSession().getPeerPort());
380                 if (sessionBytes != null) {
381                     Quiche.quiche_conn_set_session(connection.address(), sessionBytes);
382                 }
383             }
384             this.supportsDatagram = supportsDatagram;
385             sourceConnectionIds.add(connectId);
386         } finally {
387             idBuffer.release();
388         }
389     }
390 
391     private void failConnectPromiseAndThrow(Exception e) throws Exception {
392         tryFailConnectPromise(e);
393         throw e;
394     }
395 
396     private boolean tryFailConnectPromise(Exception e) {
397         ChannelPromise promise = connectPromise;
398         if (promise != null) {
399             connectPromise = null;
400             promise.tryFailure(e);
401             return true;
402         }
403         return false;
404     }
405 
406     Set<ByteBuffer> sourceConnectionIds() {
407         return sourceConnectionIds;
408     }
409 
410     boolean markInFireChannelReadCompleteQueue() {
411         if (inFireChannelReadCompleteQueue) {
412             return false;
413         }
414         inFireChannelReadCompleteQueue = true;
415         return true;
416     }
417 
418     private void failPendingConnectPromise() {
419         ChannelPromise promise = QuicheQuicChannel.this.connectPromise;
420         if (promise != null) {
421             QuicheQuicChannel.this.connectPromise = null;
422             promise.tryFailure(new QuicClosedChannelException(this.connectionCloseEvent));
423         }
424     }
425 
426     void forceClose() {
427         unsafe().close(voidPromise());
428     }
429 
430     @Override
431     protected DefaultChannelPipeline newChannelPipeline() {
432         return new DefaultChannelPipeline(this) {
433             @Override
434             protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
435                 if (msg instanceof QuicStreamChannel) {
436                     QuicStreamChannel channel = (QuicStreamChannel) msg;
437                     Quic.setupChannel(channel, streamOptionsArray, streamAttrsArray, streamHandler, logger);
438                     ctx.channel().eventLoop().register(channel);
439                 } else {
440                     super.onUnhandledInboundMessage(ctx, msg);
441                 }
442             }
443         };
444     }
445 
446     @Override
447     public QuicChannel flush() {
448         super.flush();
449         return this;
450     }
451 
452     @Override
453     public QuicChannel read() {
454         super.read();
455         return this;
456     }
457 
458     @Override
459     public Future<QuicStreamChannel> createStream(QuicStreamType type, @Nullable ChannelHandler handler,
460                                                   Promise<QuicStreamChannel> promise) {
461         if (eventLoop().inEventLoop()) {
462             ((QuicChannelUnsafe) unsafe()).connectStream(type, handler, promise);
463         } else {
464             eventLoop().execute(() -> ((QuicChannelUnsafe) unsafe()).connectStream(type, handler, promise));
465         }
466         return promise;
467     }
468 
469     @Override
470     public ChannelFuture close(boolean applicationClose, int error, ByteBuf reason, ChannelPromise promise) {
471         if (eventLoop().inEventLoop()) {
472             close0(applicationClose, error, reason, promise);
473         } else {
474             eventLoop().execute(() -> close0(applicationClose, error, reason, promise));
475         }
476         return promise;
477     }
478 
479     private void close0(boolean applicationClose, int error, ByteBuf reason, ChannelPromise promise) {
480         if (closeData == null) {
481             if (!reason.hasMemoryAddress()) {
482                 // Copy to direct buffer as that's what we need.
483                 ByteBuf copy = alloc().directBuffer(reason.readableBytes()).writeBytes(reason);
484                 reason.release();
485                 reason = copy;
486             }
487             closeData = new CloseData(applicationClose, error, reason);
488             promise.addListener(closeData);
489         } else {
490             // We already have a close scheduled that uses a close data. Lets release the buffer early.
491             reason.release();
492         }
493         close(promise);
494     }
495 
496     @Override
497     public String toString() {
498         String traceId = this.traceId;
499         if (traceId == null) {
500             return "()" + super.toString();
501         } else {
502             return '(' + traceId + ')' + super.toString();
503         }
504     }
505 
506     @Override
507     protected AbstractUnsafe newUnsafe() {
508         return new QuicChannelUnsafe();
509     }
510 
511     @Override
512     protected boolean isCompatible(EventLoop eventLoop) {
513         return parent().eventLoop() == eventLoop;
514     }
515 
516     @Override
517     @Nullable
518     protected QuicConnectionAddress localAddress0() {
519         QuicheQuicConnection connection = this.connection;
520         return connection == null ? null : connection.sourceId();
521     }
522 
523     @Override
524     @Nullable
525     protected QuicConnectionAddress remoteAddress0() {
526         QuicheQuicConnection connection = this.connection;
527         return connection == null ? null : connection.destinationId();
528     }
529 
530     @Override
531     @Nullable
532     public QuicConnectionAddress localAddress() {
533         // Override so we never cache as the sourceId() can change over life-time.
534         return localAddress0();
535     }
536 
537     @Override
538     @Nullable
539     public QuicConnectionAddress remoteAddress() {
540         // Override so we never cache as the destinationId() can change over life-time.
541         return remoteAddress0();
542     }
543 
544     @Override
545     @Nullable
546     public SocketAddress localSocketAddress() {
547         return local;
548     }
549 
550     @Override
551     @Nullable
552     public SocketAddress remoteSocketAddress() {
553         return remote;
554     }
555 
556     @Override
557     protected void doBind(SocketAddress socketAddress) {
558         throw new UnsupportedOperationException();
559     }
560 
561     @Override
562     protected void doDisconnect() throws Exception {
563         doClose();
564     }
565 
566     @Override
567     protected void doClose() throws Exception {
568         if (state == ChannelState.CLOSED) {
569             return;
570         }
571         state = ChannelState.CLOSED;
572 
573         QuicheQuicConnection conn = this.connection;
574         if (conn == null || conn.isFreed()) {
575             if (closeData != null) {
576                 closeData.reason.release();
577                 closeData = null;
578             }
579             failPendingConnectPromise();
580             return;
581         }
582 
583         // Call connectionSend() so we ensure we send all that is queued before we close the channel
584         SendResult sendResult = connectionSend(conn);
585 
586         final boolean app;
587         final int err;
588         final ByteBuf reason;
589         if (closeData == null) {
590             app = false;
591             err = 0;
592             reason = Unpooled.EMPTY_BUFFER;
593         } else {
594             app = closeData.applicationClose;
595             err = closeData.err;
596             reason = closeData.reason;
597             closeData = null;
598         }
599 
600         failPendingConnectPromise();
601         try {
602             int res = Quiche.quiche_conn_close(conn.address(), app, err,
603                     Quiche.readerMemoryAddress(reason), reason.readableBytes());
604             if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
605                 throw Quiche.convertToException(res);
606             }
607             // As we called quiche_conn_close(...) we need to ensure we will call quiche_conn_send(...) either
608             // now or we will do so once we see the channelReadComplete event.
609             //
610             // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.close
611             if (connectionSend(conn) == SendResult.SOME) {
612                 sendResult = SendResult.SOME;
613             }
614         } finally {
615 
616             // making sure that connection statistics is available
617             // even after channel is closed
618             statsAtClose = collectStats0(conn, eventLoop().newPromise());
619             try {
620                 timedOut = Quiche.quiche_conn_is_timed_out(conn.address());
621 
622                 closeStreams();
623                 if (finBuffer != null) {
624                     finBuffer.release();
625                     finBuffer = null;
626                 }
627             } finally {
628                 if (sendResult == SendResult.SOME) {
629                     // As this is the close let us flush it asap.
630                     forceFlushParent();
631                 } else {
632                     flushParent();
633                 }
634                 conn.free();
635                 if (freeTask != null) {
636                     freeTask.accept(this);
637                 }
638                 timeoutHandler.cancel();
639 
640                 local = null;
641                 remote = null;
642             }
643         }
644     }
645 
646     @Override
647     protected void doBeginRead() {
648         recvDatagramPending = true;
649         recvStreamPending = true;
650         if (datagramReadable || streamReadable) {
651             ((QuicChannelUnsafe) unsafe()).recv();
652         }
653     }
654 
655     @Override
656     protected Object filterOutboundMessage(Object msg) {
657         if (msg instanceof ByteBuf) {
658             return msg;
659         }
660         throw new UnsupportedOperationException("Unsupported message type: " + StringUtil.simpleClassName(msg));
661     }
662 
663     @Override
664     protected void doWrite(ChannelOutboundBuffer channelOutboundBuffer) throws Exception {
665         if (!supportsDatagram) {
666             throw new UnsupportedOperationException("Datagram extension is not supported");
667         }
668         boolean sendSomething = false;
669         boolean retry = false;
670         QuicheQuicConnection conn = connection;
671         try {
672             for (;;) {
673                 ByteBuf buffer = (ByteBuf) channelOutboundBuffer.current();
674                 if (buffer == null) {
675                     break;
676                 }
677 
678                 int readable = buffer.readableBytes();
679                 if (readable == 0) {
680                     // Skip empty buffers.
681                     channelOutboundBuffer.remove();
682                     continue;
683                 }
684 
685                 final int res;
686                 if (!buffer.isDirect() || buffer.nioBufferCount() > 1) {
687                     ByteBuf tmpBuffer = alloc().directBuffer(readable);
688                     try {
689                         tmpBuffer.writeBytes(buffer, buffer.readerIndex(), readable);
690                         res = sendDatagram(conn, tmpBuffer);
691                     } finally {
692                         tmpBuffer.release();
693                     }
694                 } else {
695                     res = sendDatagram(conn, buffer);
696                 }
697                 if (res >= 0) {
698                     channelOutboundBuffer.remove();
699                     sendSomething = true;
700                     retry = false;
701                 } else {
702                     if (res == Quiche.QUICHE_ERR_BUFFER_TOO_SHORT) {
703                         retry = false;
704                         channelOutboundBuffer.remove(new BufferUnderflowException());
705                     } else if (res == Quiche.QUICHE_ERR_INVALID_STATE) {
706                         throw new UnsupportedOperationException("Remote peer does not support Datagram extension");
707                     } else if (res == Quiche.QUICHE_ERR_DONE) {
708                         if (retry) {
709                             // We already retried and it didn't work. Let's drop the datagrams on the floor.
710                             for (;;) {
711                                 if (!channelOutboundBuffer.remove()) {
712                                     // The buffer is empty now.
713                                     return;
714                                 }
715                             }
716                         }
717                         // Set sendSomething to false a we will call connectionSend() now.
718                         sendSomething = false;
719                         // If this returned DONE we couldn't write anymore. This happens if the internal queue
720                         // is full. In this case we should call quiche_conn_send(...) and so make space again.
721                         if (connectionSend(conn) != SendResult.NONE) {
722                             forceFlushParent();
723                         }
724                         // Let's try again to write the message.
725                         retry = true;
726                     } else {
727                         throw Quiche.convertToException(res);
728                     }
729                 }
730             }
731         } finally {
732             if (sendSomething && connectionSend(conn) != SendResult.NONE) {
733                 flushParent();
734             }
735         }
736     }
737 
738     private static int sendDatagram(QuicheQuicConnection conn, ByteBuf buf) throws ClosedChannelException {
739         return Quiche.quiche_conn_dgram_send(connectionAddressChecked(conn),
740                 Quiche.readerMemoryAddress(buf), buf.readableBytes());
741     }
742 
743     @Override
744     public QuicChannelConfig config() {
745         return config;
746     }
747 
748     @Override
749     public boolean isOpen() {
750         return state != ChannelState.CLOSED;
751     }
752 
753     @Override
754     public boolean isActive() {
755         return state == ChannelState.ACTIVE;
756     }
757 
758     @Override
759     public ChannelMetadata metadata() {
760         return METADATA;
761     }
762 
763     /**
764      * This may call {@link #flush()} on the parent channel if needed. The flush may be delayed until the read loop
765      * is over.
766      */
767     private void flushParent() {
768         if (!inFireChannelReadCompleteQueue) {
769             forceFlushParent();
770         }
771     }
772 
773     /**
774      * Call {@link #flush()} on the parent channel.
775      */
776     private void forceFlushParent() {
777         parent().flush();
778     }
779 
780     private static long connectionAddressChecked(@Nullable QuicheQuicConnection conn) throws ClosedChannelException {
781         if (conn == null || conn.isFreed()) {
782             throw new ClosedChannelException();
783         }
784         return conn.address();
785     }
786 
787     boolean freeIfClosed() {
788         QuicheQuicConnection conn = connection;
789         if (conn == null || conn.isFreed()) {
790             return true;
791         }
792         if (conn.isClosed()) {
793             unsafe().close(newPromise());
794             return true;
795         }
796         return false;
797     }
798 
799     private void closeStreams() {
800         if (streams.isEmpty()) {
801             return;
802         }
803         final ClosedChannelException closedChannelException;
804         if (isTimedOut()) {
805             // Close the streams because of a timeout.
806             closedChannelException = new QuicTimeoutClosedChannelException();
807         } else {
808             closedChannelException = new ClosedChannelException();
809         }
810         // Make a copy to ensure we not run into a situation when we change the underlying iterator from
811         // another method and so run in an assert error.
812         for (QuicheQuicStreamChannel stream: streams.values().toArray(new QuicheQuicStreamChannel[0])) {
813             stream.unsafe().close(closedChannelException, voidPromise());
814         }
815         streams.clear();
816     }
817 
818     void streamPriority(long streamId, byte priority, boolean incremental) throws Exception {
819        int res = Quiche.quiche_conn_stream_priority(connectionAddressChecked(connection), streamId,
820                priority, incremental);
821        if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
822            throw Quiche.convertToException(res);
823        }
824     }
825 
826     void streamClosed(long streamId) {
827         streams.remove(streamId);
828     }
829 
830     boolean isStreamLocalCreated(long streamId) {
831         return (streamId & 0x1) == (server ? 1 : 0);
832     }
833 
834     QuicStreamType streamType(long streamId) {
835         return (streamId & 0x2) == 0 ? QuicStreamType.BIDIRECTIONAL : QuicStreamType.UNIDIRECTIONAL;
836     }
837 
838     void streamShutdown(long streamId, boolean read, boolean write, int err, ChannelPromise promise) {
839         QuicheQuicConnection conn = this.connection;
840         final long connectionAddress;
841         try {
842             connectionAddress = connectionAddressChecked(conn);
843         } catch (ClosedChannelException e) {
844             promise.setFailure(e);
845             return;
846         }
847         int res = 0;
848         if (read) {
849             res |= Quiche.quiche_conn_stream_shutdown(connectionAddress, streamId, Quiche.QUICHE_SHUTDOWN_READ, err);
850         }
851         if (write) {
852             res |= Quiche.quiche_conn_stream_shutdown(connectionAddress, streamId, Quiche.QUICHE_SHUTDOWN_WRITE, err);
853         }
854 
855         // As we called quiche_conn_stream_shutdown(...) we need to ensure we will call quiche_conn_send(...) either
856         // now or we will do so once we see the channelReadComplete event.
857         //
858         // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.send
859         if (connectionSend(conn) != SendResult.NONE) {
860             // Force the flush so the shutdown can be seen asap.
861             forceFlushParent();
862         }
863         if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
864             promise.setFailure(Quiche.convertToException(res));
865         } else {
866             promise.setSuccess();
867         }
868     }
869 
870     void streamSendFin(long streamId) throws Exception {
871         QuicheQuicConnection conn = connection;
872         try {
873             // Just write an empty buffer and set fin to true.
874             int res = streamSend0(conn, streamId, Unpooled.EMPTY_BUFFER, true);
875             if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
876                 throw Quiche.convertToException(res);
877             }
878         } finally {
879             // As we called quiche_conn_stream_send(...) we need to ensure we will call quiche_conn_send(...) either
880             // now or we will do so once we see the channelReadComplete event.
881             //
882             // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.send
883             if (connectionSend(conn) != SendResult.NONE) {
884                 flushParent();
885             }
886         }
887     }
888 
889     int streamSend(long streamId, ByteBuf buffer, boolean fin) throws ClosedChannelException {
890         QuicheQuicConnection conn = connection;
891         if (buffer.nioBufferCount() == 1) {
892             return streamSend0(conn, streamId, buffer, fin);
893         }
894         ByteBuffer[] nioBuffers  = buffer.nioBuffers();
895         int lastIdx = nioBuffers.length - 1;
896         int res = 0;
897         for (int i = 0; i < lastIdx; i++) {
898             ByteBuffer nioBuffer = nioBuffers[i];
899             while (nioBuffer.hasRemaining()) {
900                 int localRes = streamSend(conn, streamId, nioBuffer, false);
901                 if (localRes <= 0) {
902                     return res;
903                 }
904                 res += localRes;
905 
906                 nioBuffer.position(nioBuffer.position() + localRes);
907             }
908         }
909         int localRes = streamSend(conn, streamId, nioBuffers[lastIdx], fin);
910         if (localRes > 0) {
911             res += localRes;
912         }
913         return res;
914     }
915 
916     void connectionSendAndFlush() {
917         if (inFireChannelReadCompleteQueue || (reantranceGuard & IN_HANDLE_WRITABLE_STREAMS) != 0) {
918             return;
919         }
920         if (connectionSend(connection) != SendResult.NONE) {
921             flushParent();
922         }
923     }
924 
925     private int streamSend0(QuicheQuicConnection conn, long streamId, ByteBuf buffer, boolean fin)
926             throws ClosedChannelException {
927         return Quiche.quiche_conn_stream_send(connectionAddressChecked(conn), streamId,
928                 Quiche.readerMemoryAddress(buffer), buffer.readableBytes(), fin);
929     }
930 
931     private int streamSend(QuicheQuicConnection conn, long streamId, ByteBuffer buffer, boolean fin)
932             throws ClosedChannelException {
933         return Quiche.quiche_conn_stream_send(connectionAddressChecked(conn), streamId,
934                 Quiche.memoryAddressWithPosition(buffer), buffer.remaining(), fin);
935     }
936 
937     StreamRecvResult streamRecv(long streamId, ByteBuf buffer) throws Exception {
938         QuicheQuicConnection conn = connection;
939         long connAddr = connectionAddressChecked(conn);
940         if (finBuffer == null) {
941             finBuffer = alloc().directBuffer(1);
942         }
943         int writerIndex = buffer.writerIndex();
944         int recvLen = Quiche.quiche_conn_stream_recv(connAddr, streamId,
945                 Quiche.writerMemoryAddress(buffer), buffer.writableBytes(), Quiche.writerMemoryAddress(finBuffer));
946         if (recvLen == Quiche.QUICHE_ERR_DONE) {
947             return StreamRecvResult.DONE;
948         } else if (recvLen < 0) {
949             throw Quiche.convertToException(recvLen);
950         }
951         buffer.writerIndex(writerIndex + recvLen);
952         return finBuffer.getBoolean(0) ? StreamRecvResult.FIN : StreamRecvResult.OK;
953     }
954 
955     /**
956      * Receive some data on a QUIC connection.
957      */
958     void recv(InetSocketAddress sender, InetSocketAddress recipient, ByteBuf buffer) {
959         ((QuicChannelUnsafe) unsafe()).connectionRecv(sender, recipient, buffer);
960     }
961 
962     /**
963      * Return all source connection ids that are retired and so should be removed to map to the channel.
964      *
965      * @return retired ids.
966      */
967     List<ByteBuffer> retiredSourceConnectionId() {
968         QuicheQuicConnection connection = this.connection;
969         if (connection == null || connection.isFreed()) {
970             return Collections.emptyList();
971         }
972         long connAddr = connection.address();
973         assert connAddr != -1;
974         List<ByteBuffer> retiredSourceIds = null;
975         for (;;) {
976             byte[] retired = Quiche.quiche_conn_retired_scid_next(connAddr);
977             if (retired == null) {
978                 break;
979             }
980             if (retiredSourceIds == null) {
981                 retiredSourceIds = new ArrayList<>();
982             }
983             ByteBuffer retiredId = ByteBuffer.wrap(retired);
984             retiredSourceIds.add(retiredId);
985             sourceConnectionIds.remove(retiredId);
986         }
987         if (retiredSourceIds == null) {
988             return Collections.emptyList();
989         }
990         return retiredSourceIds;
991     }
992 
993     List<ByteBuffer> newSourceConnectionIds() {
994         if (connectionIdAddressGenerator != null && resetTokenGenerator != null) {
995             QuicheQuicConnection connection = this.connection;
996             if (connection == null || connection.isFreed()) {
997                 return Collections.emptyList();
998             }
999             long connAddr = connection.address();
1000             // Generate all extra source ids that we can provide. This will cause frames that need to be sent. Which
1001             // is the reason why we might need to call connectionSendAndFlush().
1002             int left = Quiche.quiche_conn_scids_left(connAddr);
1003             if (left > 0) {
1004                 QuicConnectionAddress sourceAddr = connection.sourceId();
1005                 if (sourceAddr == null) {
1006                     return Collections.emptyList();
1007                 }
1008                 List<ByteBuffer> generatedIds = new ArrayList<>(left);
1009                 boolean sendAndFlush = false;
1010                 ByteBuffer key = sourceAddr.id();
1011                 ByteBuf connIdBuffer = alloc().directBuffer(key.remaining());
1012 
1013                 byte[] resetTokenArray = new byte[Quic.RESET_TOKEN_LEN];
1014                 try {
1015                     do {
1016                         ByteBuffer srcId = connectionIdAddressGenerator.newId(key.duplicate(), key.remaining())
1017                                 .asReadOnlyBuffer();
1018                         connIdBuffer.clear();
1019                         connIdBuffer.writeBytes(srcId.duplicate());
1020                         ByteBuffer resetToken = resetTokenGenerator.newResetToken(srcId.duplicate());
1021                         resetToken.get(resetTokenArray);
1022                         long result = Quiche.quiche_conn_new_scid(
1023                                 connAddr, Quiche.memoryAddress(connIdBuffer, 0, connIdBuffer.readableBytes()),
1024                                 connIdBuffer.readableBytes(), resetTokenArray, false, -1);
1025                         if (result < 0) {
1026                             break;
1027                         }
1028                         sendAndFlush = true;
1029                         generatedIds.add(srcId.duplicate());
1030                         sourceConnectionIds.add(srcId);
1031                     } while (--left > 0);
1032                 } finally {
1033                     connIdBuffer.release();
1034                 }
1035 
1036                 if (sendAndFlush) {
1037                     connectionSendAndFlush();
1038                 }
1039                 return generatedIds;
1040             }
1041         }
1042         return Collections.emptyList();
1043     }
1044 
1045     void writable() {
1046         QuicheQuicConnection conn = connection;
1047         SendResult result = connectionSend(conn);
1048         handleWritableStreams(conn);
1049         if (connectionSend(conn) == SendResult.SOME) {
1050             result = SendResult.SOME;
1051         }
1052         if (result == SendResult.SOME) {
1053             // The writability changed so lets flush as fast as possible.
1054             forceFlushParent();
1055         }
1056         freeIfClosed();
1057     }
1058 
1059     int streamCapacity(long streamId) {
1060         QuicheQuicConnection conn = connection;
1061         if (conn.isClosed()) {
1062             return 0;
1063         }
1064         return Quiche.quiche_conn_stream_capacity(conn.address(), streamId);
1065     }
1066 
1067     private boolean handleWritableStreams(QuicheQuicConnection conn) {
1068         if (conn.isFreed()) {
1069             return false;
1070         }
1071         reantranceGuard |= IN_HANDLE_WRITABLE_STREAMS;
1072         try {
1073             long connAddr = conn.address();
1074             boolean mayNeedWrite = false;
1075 
1076             if (Quiche.quiche_conn_is_established(connAddr) ||
1077                     Quiche.quiche_conn_is_in_early_data(connAddr)) {
1078                 long writableIterator = Quiche.quiche_conn_writable(connAddr);
1079 
1080                 int totalWritable = 0;
1081                 try {
1082                     // For streams we always process all streams when at least on read was requested.
1083                     for (;;) {
1084                         int writable = Quiche.quiche_stream_iter_next(
1085                                 writableIterator, writableStreams);
1086                         for (int i = 0; i < writable; i++) {
1087                             long streamId = writableStreams[i];
1088                             QuicheQuicStreamChannel streamChannel = streams.get(streamId);
1089                             if (streamChannel != null) {
1090                                 int capacity = Quiche.quiche_conn_stream_capacity(connAddr, streamId);
1091                                 if (streamChannel.writable(capacity)) {
1092                                     mayNeedWrite = true;
1093                                 }
1094                             }
1095                         }
1096                         if (writable > 0) {
1097                             totalWritable += writable;
1098                         }
1099                         if (writable < writableStreams.length) {
1100                             // We did handle all writable streams.
1101                             break;
1102                         }
1103                     }
1104                 } finally {
1105                     Quiche.quiche_stream_iter_free(writableIterator);
1106                 }
1107                 writableStreams = growIfNeeded(writableStreams, totalWritable);
1108             }
1109             return mayNeedWrite;
1110         } finally {
1111             reantranceGuard &= ~IN_HANDLE_WRITABLE_STREAMS;
1112         }
1113     }
1114 
1115     /**
1116      * Called once we receive a channelReadComplete event. This method will take care of calling
1117      * {@link ChannelPipeline#fireChannelReadComplete()} if needed and also to handle pending flushes of
1118      * writable {@link QuicheQuicStreamChannel}s.
1119      */
1120     void recvComplete() {
1121         try {
1122             QuicheQuicConnection conn = connection;
1123             if (conn.isFreed()) {
1124                 // Ensure we flush all pending writes.
1125                 forceFlushParent();
1126                 return;
1127             }
1128             fireChannelReadCompleteIfNeeded();
1129 
1130             // If we had called recv we need to ensure we call send as well.
1131             // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.send
1132             connectionSend(conn);
1133 
1134             // We are done with the read loop, flush all pending writes now.
1135             forceFlushParent();
1136             freeIfClosed();
1137         } finally {
1138             inFireChannelReadCompleteQueue = false;
1139         }
1140     }
1141 
1142     private void fireChannelReadCompleteIfNeeded() {
1143         if (fireChannelReadCompletePending) {
1144             fireChannelReadCompletePending = false;
1145             pipeline().fireChannelReadComplete();
1146         }
1147     }
1148 
1149     private void fireExceptionEvents(QuicheQuicConnection conn, Throwable cause) {
1150         if (cause instanceof SSLHandshakeException) {
1151             notifyAboutHandshakeCompletionIfNeeded(conn, (SSLHandshakeException) cause);
1152         }
1153         pipeline().fireExceptionCaught(cause);
1154     }
1155 
1156     private boolean runTasksDirectly() {
1157         return sslTaskExecutor == null || sslTaskExecutor == ImmediateExecutor.INSTANCE ||
1158                 sslTaskExecutor == ImmediateEventExecutor.INSTANCE;
1159     }
1160 
1161     private void runAllTaskSend(QuicheQuicConnection conn, Runnable task) {
1162         sslTaskExecutor.execute(decorateTaskSend(conn, task));
1163     }
1164 
1165     private void runAll(QuicheQuicConnection conn, Runnable task) {
1166         do {
1167             task.run();
1168         } while ((task = conn.sslTask()) != null);
1169     }
1170 
1171     private Runnable decorateTaskSend(QuicheQuicConnection conn, Runnable task) {
1172         return () -> {
1173             try {
1174                 runAll(conn, task);
1175             } finally {
1176                 // Move back to the EventLoop.
1177                 eventLoop().execute(() -> {
1178                     // Call connection send to continue handshake if needed.
1179                     if (connectionSend(conn) != SendResult.NONE) {
1180                         forceFlushParent();
1181                     }
1182                     freeIfClosed();
1183                 });
1184             }
1185         };
1186     }
1187 
1188     private SendResult connectionSendSegments(QuicheQuicConnection conn,
1189                                               SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator) {
1190         if (conn.isClosed()) {
1191             return SendResult.NONE;
1192         }
1193         List<ByteBuf> bufferList = new ArrayList<>(segmentedDatagramPacketAllocator.maxNumSegments());
1194         long connAddr = conn.address();
1195         int maxDatagramSize = Quiche.quiche_conn_max_send_udp_payload_size(connAddr);
1196         SendResult sendResult = SendResult.NONE;
1197         boolean close = false;
1198         try {
1199             for (;;) {
1200                 int len = calculateSendBufferLength(connAddr, maxDatagramSize);
1201                 ByteBuf out = alloc().directBuffer(len);
1202 
1203                 ByteBuffer sendInfo = conn.nextSendInfo();
1204                 InetSocketAddress sendToAddress = this.remote;
1205 
1206                 int writerIndex = out.writerIndex();
1207                 int written = Quiche.quiche_conn_send(
1208                         connAddr, Quiche.writerMemoryAddress(out), out.writableBytes(),
1209                         Quiche.memoryAddressWithPosition(sendInfo));
1210                 if (written == 0) {
1211                     out.release();
1212                     // No need to create a new datagram packet. Just try again.
1213                     continue;
1214                 }
1215                 final boolean done;
1216                 if (written < 0) {
1217                     done = true;
1218                     if (written != Quiche.QUICHE_ERR_DONE) {
1219                         close = Quiche.shouldClose(written);
1220                         Exception e = Quiche.convertToException(written);
1221                         if (!tryFailConnectPromise(e)) {
1222                             // Only fire through the pipeline if this does not fail the connect promise.
1223                             fireExceptionEvents(conn, e);
1224                         }
1225                     }
1226                 } else {
1227                     done = false;
1228                 }
1229                 int size = bufferList.size();
1230                 if (done) {
1231                     // We are done, release the buffer and send what we did build up so far.
1232                     out.release();
1233 
1234                     switch (size) {
1235                         case 0:
1236                             // Nothing more to write.
1237                             break;
1238                         case 1:
1239                             // We can write a normal datagram packet.
1240                             parent().write(new DatagramPacket(bufferList.get(0), sendToAddress));
1241                             sendResult = SendResult.SOME;
1242                             break;
1243                         default:
1244                             int segmentSize = segmentSize(bufferList);
1245                             ByteBuf compositeBuffer = Unpooled.wrappedBuffer(bufferList.toArray(new ByteBuf[0]));
1246                             // We had more than one buffer, create a segmented packet.
1247                             parent().write(segmentedDatagramPacketAllocator.newPacket(
1248                                     compositeBuffer, segmentSize, sendToAddress));
1249                             sendResult = SendResult.SOME;
1250                             break;
1251                     }
1252                     bufferList.clear();
1253                     if (close) {
1254                         sendResult = SendResult.CLOSE;
1255                     }
1256                     return sendResult;
1257                 }
1258                 out.writerIndex(writerIndex + written);
1259 
1260                 int segmentSize = -1;
1261                 if (conn.isSendInfoChanged()) {
1262                     // Change the cached address and let the user know there was a connection migration.
1263                     remote = QuicheSendInfo.getToAddress(sendInfo);
1264                     local = QuicheSendInfo.getFromAddress(sendInfo);
1265 
1266                     if (size > 0) {
1267                         // We have something in the out list already, we need to send this now and so we set the
1268                         // segmentSize.
1269                         segmentSize = segmentSize(bufferList);
1270                     }
1271                 } else if (size > 0) {
1272                     int lastReadable = segmentSize(bufferList);
1273                     // Check if we either need to send now because the last buffer we added has a smaller size then this
1274                     // one or if we reached the maximum number of segments that we can send.
1275                     if (lastReadable != out.readableBytes() ||
1276                             size == segmentedDatagramPacketAllocator.maxNumSegments()) {
1277                         segmentSize = lastReadable;
1278                     }
1279                 }
1280 
1281                 // If the segmentSize is not -1 we know we need to send now what was in the out list.
1282                 if (segmentSize != -1) {
1283                     final boolean stop;
1284                     if (size == 1) {
1285                         // Only one buffer in the out list, there is no need to use segments.
1286                         stop = writePacket(new DatagramPacket(
1287                                 bufferList.get(0), sendToAddress), maxDatagramSize, len);
1288                     } else {
1289                         // Create a packet with segments in.
1290                         ByteBuf compositeBuffer = Unpooled.wrappedBuffer(bufferList.toArray(new ByteBuf[0]));
1291                         stop = writePacket(segmentedDatagramPacketAllocator.newPacket(
1292                                 compositeBuffer, segmentSize, sendToAddress), maxDatagramSize, len);
1293                     }
1294                     bufferList.clear();
1295                     sendResult = SendResult.SOME;
1296 
1297                     if (stop) {
1298                         // Nothing left in the window, continue later. That said we still need to also
1299                         // write the previous filled out buffer as otherwise we would either leak or need
1300                         // to drop it and so produce some loss.
1301                         if (out.isReadable()) {
1302                             parent().write(new DatagramPacket(out, sendToAddress));
1303                         } else {
1304                             out.release();
1305                         }
1306                         if (close) {
1307                             sendResult = SendResult.CLOSE;
1308                         }
1309                         return sendResult;
1310                     }
1311                 }
1312                 // Let's add a touch with the bufferList as a hint. This will help us to debug leaks if there
1313                 // are any.
1314                 out.touch(bufferList);
1315                 // store for later, so we can make use of segments.
1316                 bufferList.add(out);
1317             }
1318         } finally {
1319             // NOOP
1320         }
1321     }
1322 
1323     private static int segmentSize(List<ByteBuf> bufferList) {
1324         assert !bufferList.isEmpty();
1325         int size = bufferList.size();
1326         return bufferList.get(size - 1).readableBytes();
1327     }
1328 
1329     private SendResult connectionSendSimple(QuicheQuicConnection conn) {
1330         if (conn.isClosed()) {
1331             return SendResult.NONE;
1332         }
1333         long connAddr = conn.address();
1334         SendResult sendResult = SendResult.NONE;
1335         boolean close = false;
1336         int maxDatagramSize = Quiche.quiche_conn_max_send_udp_payload_size(connAddr);
1337         for (;;) {
1338             ByteBuffer sendInfo = conn.nextSendInfo();
1339 
1340             int len = calculateSendBufferLength(connAddr, maxDatagramSize);
1341             ByteBuf out = alloc().directBuffer(len);
1342             int writerIndex = out.writerIndex();
1343 
1344             int written = Quiche.quiche_conn_send(
1345                     connAddr, Quiche.writerMemoryAddress(out), out.writableBytes(),
1346                     Quiche.memoryAddressWithPosition(sendInfo));
1347 
1348             if (written == 0) {
1349                 // No need to create a new datagram packet. Just release and try again.
1350                 out.release();
1351                 continue;
1352             }
1353             if (written < 0) {
1354                 out.release();
1355                 if (written != Quiche.QUICHE_ERR_DONE) {
1356                     close = Quiche.shouldClose(written);
1357 
1358                     Exception e = Quiche.convertToException(written);
1359                     if (!tryFailConnectPromise(e)) {
1360                         fireExceptionEvents(conn, e);
1361                     }
1362                 }
1363                 break;
1364             }
1365             if (conn.isSendInfoChanged()) {
1366                 // Change the cached address
1367                 remote = QuicheSendInfo.getToAddress(sendInfo);
1368                 local = QuicheSendInfo.getFromAddress(sendInfo);
1369             }
1370             out.writerIndex(writerIndex + written);
1371             boolean stop = writePacket(new DatagramPacket(out, remote), maxDatagramSize, len);
1372             sendResult = SendResult.SOME;
1373             if (stop) {
1374                 // Nothing left in the window, continue later
1375                 break;
1376             }
1377         }
1378         if (close) {
1379             sendResult = SendResult.CLOSE;
1380         }
1381         return sendResult;
1382     }
1383 
1384     private boolean writePacket(DatagramPacket packet, int maxDatagramSize, int len) {
1385         ChannelFuture future = parent().write(packet);
1386         if (isSendWindowUsed(maxDatagramSize, len)) {
1387             // Nothing left in the window, continue later
1388             future.addListener(continueSendingListener);
1389             return true;
1390         }
1391         return false;
1392     }
1393 
1394     private static boolean isSendWindowUsed(int maxDatagramSize, int len) {
1395         return len < maxDatagramSize;
1396     }
1397 
1398     private static int calculateSendBufferLength(long connAddr, int maxDatagramSize) {
1399         int len = Math.min(maxDatagramSize, Quiche.quiche_conn_send_quantum(connAddr));
1400         if (len <= 0) {
1401             // If there is no room left we just return some small number to reduce the risk of packet drop
1402             // while still be able to attach the listener to the write future.
1403             // We use the value of 8 because such an allocation will be cheap to serve from the
1404             // PooledByteBufAllocator while still serve our need.
1405             return 8;
1406         }
1407         return len;
1408     }
1409 
1410     /**
1411      * Write datagrams if needed and return {@code true} if something was written and we need to call
1412      * {@link Channel#flush()} at some point.
1413      */
1414     private SendResult connectionSend(QuicheQuicConnection conn) {
1415         if (conn.isFreed()) {
1416             return SendResult.NONE;
1417         }
1418         if ((reantranceGuard & IN_CONNECTION_SEND) != 0) {
1419             // Let's notify about early data if needed.
1420             notifyEarlyDataReadyIfNeeded(conn);
1421             return SendResult.NONE;
1422         }
1423 
1424         reantranceGuard |= IN_CONNECTION_SEND;
1425         try {
1426             SendResult sendResult;
1427             SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator =
1428                     config.getSegmentedDatagramPacketAllocator();
1429             if (segmentedDatagramPacketAllocator.maxNumSegments() > 0) {
1430                 sendResult = connectionSendSegments(conn, segmentedDatagramPacketAllocator);
1431             } else {
1432                 sendResult = connectionSendSimple(conn);
1433             }
1434 
1435             // Process / schedule all tasks that were created.
1436 
1437             Runnable task = conn.sslTask();
1438             if (task != null) {
1439                 if (runTasksDirectly()) {
1440                     // Consume all tasks
1441                     do {
1442                         task.run();
1443                         // Notify about early data ready if needed.
1444                         notifyEarlyDataReadyIfNeeded(conn);
1445                     } while ((task = conn.sslTask()) != null);
1446 
1447                     // Let's try again sending after we did process all tasks.
1448                     // We schedule this on the EventLoop as otherwise we will get into trouble with re-entrance.
1449                     eventLoop().execute(new Runnable() {
1450                         @Override
1451                         public void run() {
1452                             // Call connection send to continue handshake if needed.
1453                             if (connectionSend(conn) != SendResult.NONE) {
1454                                 forceFlushParent();
1455                             }
1456                             freeIfClosed();
1457                         }
1458                     });
1459                 } else {
1460                     runAllTaskSend(conn, task);
1461                 }
1462             } else {
1463                 // Notify about early data ready if needed.
1464                 notifyEarlyDataReadyIfNeeded(conn);
1465             }
1466 
1467             // Whenever we called connection_send we should also schedule the timer if needed.
1468             timeoutHandler.scheduleTimeout();
1469             return sendResult;
1470         } finally {
1471             reantranceGuard &= ~IN_CONNECTION_SEND;
1472         }
1473     }
1474 
1475     private final class QuicChannelUnsafe extends AbstractChannel.AbstractUnsafe {
1476 
1477         void connectStream(QuicStreamType type, @Nullable ChannelHandler handler,
1478                            Promise<QuicStreamChannel> promise) {
1479             if (!promise.setUncancellable()) {
1480                 return;
1481             }
1482             long streamId = idGenerator.nextStreamId(type == QuicStreamType.BIDIRECTIONAL);
1483 
1484             try {
1485                 int res = streamSend0(connection, streamId, Unpooled.EMPTY_BUFFER, false);
1486                 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
1487                     throw Quiche.convertToException(res);
1488                 }
1489             } catch (Exception e) {
1490                 promise.setFailure(e);
1491                 return;
1492             }
1493             if (type == QuicStreamType.UNIDIRECTIONAL) {
1494                 UNI_STREAMS_LEFT_UPDATER.decrementAndGet(QuicheQuicChannel.this);
1495             } else {
1496                 BIDI_STREAMS_LEFT_UPDATER.decrementAndGet(QuicheQuicChannel.this);
1497             }
1498             QuicheQuicStreamChannel streamChannel = addNewStreamChannel(streamId);
1499             if (handler != null) {
1500                 streamChannel.pipeline().addLast(handler);
1501             }
1502             eventLoop().register(streamChannel).addListener((ChannelFuture f) -> {
1503                 if (f.isSuccess()) {
1504                     promise.setSuccess(streamChannel);
1505                 } else {
1506                     promise.setFailure(f.cause());
1507                     streams.remove(streamId);
1508                 }
1509             });
1510         }
1511 
1512         @Override
1513         public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
1514             assert eventLoop().inEventLoop();
1515             if (!channelPromise.setUncancellable()) {
1516                 return;
1517             }
1518             if (server) {
1519                 channelPromise.setFailure(new UnsupportedOperationException());
1520                 return;
1521             }
1522 
1523             if (connectPromise != null) {
1524                 channelPromise.setFailure(new ConnectionPendingException());
1525                 return;
1526             }
1527 
1528             if (remote instanceof QuicConnectionAddress) {
1529                 if (!sourceConnectionIds.isEmpty()) {
1530                     // If a key is assigned we know this channel was already connected.
1531                     channelPromise.setFailure(new AlreadyConnectedException());
1532                     return;
1533                 }
1534 
1535                 connectAddress = (QuicConnectionAddress) remote;
1536                 connectPromise = channelPromise;
1537 
1538                 // Schedule connect timeout.
1539                 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1540                 if (connectTimeoutMillis > 0) {
1541                     connectTimeoutFuture = eventLoop().schedule(() -> {
1542                         ChannelPromise connectPromise = QuicheQuicChannel.this.connectPromise;
1543                         if (connectPromise != null && !connectPromise.isDone()
1544                                 && connectPromise.tryFailure(new ConnectTimeoutException(
1545                                 "connection timed out: " + remote))) {
1546                             close(voidPromise());
1547                         }
1548                     }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1549                 }
1550 
1551                 connectPromise.addListener((ChannelFuture future) -> {
1552                     if (future.isCancelled()) {
1553                         if (connectTimeoutFuture != null) {
1554                             connectTimeoutFuture.cancel(false);
1555                         }
1556                         connectPromise = null;
1557                         close(voidPromise());
1558                     }
1559                 });
1560 
1561                 parent().connect(new QuicheQuicChannelAddress(QuicheQuicChannel.this))
1562                         .addListener(f -> {
1563                             ChannelPromise connectPromise = QuicheQuicChannel.this.connectPromise;
1564                             if (connectPromise != null && !f.isSuccess()) {
1565                                 connectPromise.tryFailure(f.cause());
1566                                 // close everything after notify about failure.
1567                                 unsafe().closeForcibly();
1568                             }
1569                         });
1570                 return;
1571             }
1572 
1573             channelPromise.setFailure(new UnsupportedOperationException());
1574         }
1575 
1576         private void fireConnectCloseEventIfNeeded(QuicheQuicConnection conn) {
1577             if (connectionCloseEvent == null && !conn.isFreed()) {
1578                 connectionCloseEvent = Quiche.quiche_conn_peer_error(conn.address());
1579                 if (connectionCloseEvent != null) {
1580                     pipeline().fireUserEventTriggered(connectionCloseEvent);
1581                 }
1582             }
1583         }
1584 
1585         void connectionRecv(InetSocketAddress sender, InetSocketAddress recipient, ByteBuf buffer) {
1586             QuicheQuicConnection conn = QuicheQuicChannel.this.connection;
1587             if (conn.isFreed()) {
1588                 return;
1589             }
1590             int bufferReadable = buffer.readableBytes();
1591             if (bufferReadable == 0) {
1592                 // Nothing to do here. Just return...
1593                 // See also https://github.com/cloudflare/quiche/issues/817
1594                 return;
1595             }
1596 
1597             reantranceGuard |= IN_RECV;
1598             boolean close = false;
1599             try {
1600                 ByteBuf tmpBuffer = null;
1601                 // We need to make a copy if the buffer is read only as recv(...) may modify the input buffer as well.
1602                 // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.recv
1603                 if (buffer.isReadOnly()) {
1604                     tmpBuffer = alloc().directBuffer(buffer.readableBytes());
1605                     tmpBuffer.writeBytes(buffer);
1606                     buffer = tmpBuffer;
1607                 }
1608                 long memoryAddress = Quiche.readerMemoryAddress(buffer);
1609 
1610                 ByteBuffer recvInfo = conn.nextRecvInfo();
1611                 QuicheRecvInfo.setRecvInfo(recvInfo, sender, recipient);
1612 
1613                 remote = sender;
1614                 local = recipient;
1615 
1616                 try {
1617                     do  {
1618                         // Call quiche_conn_recv(...) until we consumed all bytes or we did receive some error.
1619                         int res = Quiche.quiche_conn_recv(conn.address(), memoryAddress, bufferReadable,
1620                                 Quiche.memoryAddressWithPosition(recvInfo));
1621                         final boolean done;
1622                         if (res < 0) {
1623                             done = true;
1624                             if (res != Quiche.QUICHE_ERR_DONE) {
1625                                 close = Quiche.shouldClose(res);
1626                                 Exception e = Quiche.convertToException(res);
1627                                 if (tryFailConnectPromise(e)) {
1628                                     break;
1629                                 }
1630                                 fireExceptionEvents(conn, e);
1631                             }
1632                         } else {
1633                             done = false;
1634                         }
1635                         // Process / schedule all tasks that were created.
1636                         Runnable task = conn.sslTask();
1637                         if (task != null) {
1638                             if (runTasksDirectly()) {
1639                                 // Consume all tasks
1640                                 do {
1641                                     task.run();
1642                                 } while ((task = conn.sslTask()) != null);
1643                                 processReceived(conn);
1644                             } else {
1645                                 runAllTaskRecv(conn, task);
1646                             }
1647                         } else {
1648                             processReceived(conn);
1649                         }
1650 
1651                         if (done) {
1652                             break;
1653                         }
1654                         memoryAddress += res;
1655                         bufferReadable -= res;
1656                     } while (bufferReadable > 0 && !conn.isFreed());
1657                 } finally {
1658                     buffer.skipBytes((int) (memoryAddress - Quiche.readerMemoryAddress(buffer)));
1659                     if (tmpBuffer != null) {
1660                         tmpBuffer.release();
1661                     }
1662                 }
1663                 if (close) {
1664                     // Let's close now as there is no way to recover
1665                     unsafe().close(newPromise());
1666                 }
1667             } finally {
1668                 reantranceGuard &= ~IN_RECV;
1669             }
1670         }
1671 
1672         private void processReceived(QuicheQuicConnection conn) {
1673             // Handle pending channelActive if needed.
1674             if (handlePendingChannelActive(conn)) {
1675                 // Connection was closed right away.
1676                 return;
1677             }
1678 
1679             notifyAboutHandshakeCompletionIfNeeded(conn, null);
1680             fireConnectCloseEventIfNeeded(conn);
1681 
1682             if (conn.isFreed()) {
1683                 return;
1684             }
1685 
1686             long connAddr = conn.address();
1687             if (Quiche.quiche_conn_is_established(connAddr) ||
1688                     Quiche.quiche_conn_is_in_early_data(connAddr)) {
1689                 long uniLeftOld = uniStreamsLeft;
1690                 long bidiLeftOld = bidiStreamsLeft;
1691                 // Only fetch new stream info when we used all our credits
1692                 if (uniLeftOld == 0 || bidiLeftOld == 0) {
1693                     long uniLeft = Quiche.quiche_conn_peer_streams_left_uni(connAddr);
1694                     long bidiLeft = Quiche.quiche_conn_peer_streams_left_bidi(connAddr);
1695                     uniStreamsLeft = uniLeft;
1696                     bidiStreamsLeft = bidiLeft;
1697                     if (uniLeftOld != uniLeft || bidiLeftOld != bidiLeft) {
1698                         pipeline().fireUserEventTriggered(QuicStreamLimitChangedEvent.INSTANCE);
1699                     }
1700                 }
1701 
1702                 handlePathEvents(conn);
1703 
1704                 if (handleWritableStreams(conn)) {
1705                     // Some data was produced, let's flush.
1706                     flushParent();
1707                 }
1708 
1709                 datagramReadable = true;
1710                 streamReadable = true;
1711 
1712                 recvDatagram(conn);
1713                 recvStream(conn);
1714             }
1715         }
1716 
1717         private void handlePathEvents(QuicheQuicConnection conn) {
1718             long event;
1719             while (!conn.isFreed() && (event = Quiche.quiche_conn_path_event_next(conn.address())) > 0) {
1720                 try {
1721                     int type = Quiche.quiche_path_event_type(event);
1722 
1723                     if (type == Quiche.QUICHE_PATH_EVENT_NEW) {
1724                         Object[] ret = Quiche.quiche_path_event_new(event);
1725                         InetSocketAddress local = (InetSocketAddress) ret[0];
1726                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1727                         pipeline().fireUserEventTriggered(new QuicPathEvent.New(local, peer));
1728                     } else if (type == Quiche.QUICHE_PATH_EVENT_VALIDATED) {
1729                         Object[] ret = Quiche.quiche_path_event_validated(event);
1730                         InetSocketAddress local = (InetSocketAddress) ret[0];
1731                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1732                         pipeline().fireUserEventTriggered(new QuicPathEvent.Validated(local, peer));
1733                     } else if (type == Quiche.QUICHE_PATH_EVENT_FAILED_VALIDATION) {
1734                         Object[] ret = Quiche.quiche_path_event_failed_validation(event);
1735                         InetSocketAddress local = (InetSocketAddress) ret[0];
1736                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1737                         pipeline().fireUserEventTriggered(new QuicPathEvent.FailedValidation(local, peer));
1738                     } else if (type == Quiche.QUICHE_PATH_EVENT_CLOSED) {
1739                         Object[] ret = Quiche.quiche_path_event_closed(event);
1740                         InetSocketAddress local = (InetSocketAddress) ret[0];
1741                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1742                         pipeline().fireUserEventTriggered(new QuicPathEvent.Closed(local, peer));
1743                     } else if (type == Quiche.QUICHE_PATH_EVENT_REUSED_SOURCE_CONNECTION_ID) {
1744                         Object[] ret = Quiche.quiche_path_event_reused_source_connection_id(event);
1745                         Long seq = (Long) ret[0];
1746                         InetSocketAddress localOld = (InetSocketAddress) ret[1];
1747                         InetSocketAddress peerOld = (InetSocketAddress) ret[2];
1748                         InetSocketAddress local = (InetSocketAddress) ret[3];
1749                         InetSocketAddress peer = (InetSocketAddress) ret[4];
1750                         pipeline().fireUserEventTriggered(
1751                                 new QuicPathEvent.ReusedSourceConnectionId(seq, localOld, peerOld, local, peer));
1752                     } else if (type == Quiche.QUICHE_PATH_EVENT_PEER_MIGRATED) {
1753                         Object[] ret = Quiche.quiche_path_event_peer_migrated(event);
1754                         InetSocketAddress local = (InetSocketAddress) ret[0];
1755                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1756                         pipeline().fireUserEventTriggered(new QuicPathEvent.PeerMigrated(local, peer));
1757                     }
1758                 } finally {
1759                     Quiche.quiche_path_event_free(event);
1760                 }
1761             }
1762         }
1763 
1764         private void runAllTaskRecv(QuicheQuicConnection conn, Runnable task) {
1765             sslTaskExecutor.execute(decorateTaskRecv(conn, task));
1766         }
1767 
1768         private Runnable decorateTaskRecv(QuicheQuicConnection conn, Runnable task) {
1769             return () -> {
1770                 try {
1771                     runAll(conn, task);
1772                 } finally {
1773                     // Move back to the EventLoop.
1774                     eventLoop().execute(() -> {
1775                         if (!conn.isFreed()) {
1776                             processReceived(conn);
1777 
1778                             // Call connection send to continue handshake if needed.
1779                             if (connectionSend(conn) != SendResult.NONE) {
1780                                 forceFlushParent();
1781                             }
1782 
1783                             freeIfClosed();
1784                         }
1785                     });
1786                 }
1787             };
1788         }
1789         void recv() {
1790             QuicheQuicConnection conn = connection;
1791             if ((reantranceGuard & IN_RECV) != 0 || conn.isFreed()) {
1792                 return;
1793             }
1794 
1795             long connAddr = conn.address();
1796             // Check if we can read anything yet.
1797             if (!Quiche.quiche_conn_is_established(connAddr) &&
1798                     !Quiche.quiche_conn_is_in_early_data(connAddr)) {
1799                 return;
1800             }
1801 
1802             reantranceGuard |= IN_RECV;
1803             try {
1804                 recvDatagram(conn);
1805                 recvStream(conn);
1806             } finally {
1807                 fireChannelReadCompleteIfNeeded();
1808                 reantranceGuard &= ~IN_RECV;
1809             }
1810         }
1811 
1812         private void recvStream(QuicheQuicConnection conn) {
1813             if (conn.isFreed()) {
1814                 return;
1815             }
1816             long connAddr = conn.address();
1817             long readableIterator = Quiche.quiche_conn_readable(connAddr);
1818             int totalReadable = 0;
1819             if (readableIterator != -1) {
1820                 try {
1821                     // For streams we always process all streams when at least on read was requested.
1822                     if (recvStreamPending && streamReadable) {
1823                         for (;;) {
1824                             int readable = Quiche.quiche_stream_iter_next(
1825                                     readableIterator, readableStreams);
1826                             for (int i = 0; i < readable; i++) {
1827                                 long streamId = readableStreams[i];
1828                                 QuicheQuicStreamChannel streamChannel = streams.get(streamId);
1829                                 if (streamChannel == null) {
1830                                     recvStreamPending = false;
1831                                     fireChannelReadCompletePending = true;
1832                                     streamChannel = addNewStreamChannel(streamId);
1833                                     streamChannel.readable();
1834                                     pipeline().fireChannelRead(streamChannel);
1835                                 } else {
1836                                     streamChannel.readable();
1837                                 }
1838                             }
1839                             if (readable < readableStreams.length) {
1840                                 // We did consume all readable streams.
1841                                 streamReadable = false;
1842                                 break;
1843                             }
1844                             if (readable > 0) {
1845                                 totalReadable += readable;
1846                             }
1847                         }
1848                     }
1849                 } finally {
1850                     Quiche.quiche_stream_iter_free(readableIterator);
1851                 }
1852                 readableStreams = growIfNeeded(readableStreams, totalReadable);
1853             }
1854         }
1855 
1856         private void recvDatagram(QuicheQuicConnection conn) {
1857             if (!supportsDatagram) {
1858                 return;
1859             }
1860             while (recvDatagramPending && datagramReadable && !conn.isFreed()) {
1861                 @SuppressWarnings("deprecation")
1862                 RecvByteBufAllocator.Handle recvHandle = recvBufAllocHandle();
1863                 recvHandle.reset(config());
1864 
1865                 int numMessagesRead = 0;
1866                 do {
1867                     long connAddr = conn.address();
1868                     int len = Quiche.quiche_conn_dgram_recv_front_len(connAddr);
1869                     if (len == Quiche.QUICHE_ERR_DONE) {
1870                         datagramReadable = false;
1871                         return;
1872                     }
1873 
1874                     ByteBuf datagramBuffer = alloc().directBuffer(len);
1875                     recvHandle.attemptedBytesRead(datagramBuffer.writableBytes());
1876                     int writerIndex = datagramBuffer.writerIndex();
1877                     long memoryAddress = Quiche.writerMemoryAddress(datagramBuffer);
1878 
1879                     int written = Quiche.quiche_conn_dgram_recv(connAddr,
1880                             memoryAddress, datagramBuffer.writableBytes());
1881                     if (written < 0) {
1882                         datagramBuffer.release();
1883                         if (written == Quiche.QUICHE_ERR_DONE) {
1884                             // We did consume all datagram packets.
1885                             datagramReadable = false;
1886                             break;
1887                         }
1888                         pipeline().fireExceptionCaught(Quiche.convertToException(written));
1889                     }
1890                     recvHandle.lastBytesRead(written);
1891                     recvHandle.incMessagesRead(1);
1892                     numMessagesRead++;
1893                     datagramBuffer.writerIndex(writerIndex + written);
1894                     recvDatagramPending = false;
1895                     fireChannelReadCompletePending = true;
1896 
1897                     pipeline().fireChannelRead(datagramBuffer);
1898                 } while (recvHandle.continueReading() && !conn.isFreed());
1899                 recvHandle.readComplete();
1900 
1901                 // Check if we produced any messages.
1902                 if (numMessagesRead > 0) {
1903                     fireChannelReadCompleteIfNeeded();
1904                 }
1905             }
1906         }
1907 
1908         private boolean handlePendingChannelActive(QuicheQuicConnection conn) {
1909             if (conn.isFreed() || state == ChannelState.CLOSED) {
1910                 return true;
1911             }
1912             if (server) {
1913                 if (state == ChannelState.OPEN && Quiche.quiche_conn_is_established(conn.address())) {
1914                     // We didn't notify before about channelActive... Update state and fire the event.
1915                     state = ChannelState.ACTIVE;
1916 
1917                     pipeline().fireChannelActive();
1918                     notifyAboutHandshakeCompletionIfNeeded(conn, null);
1919                     fireDatagramExtensionEvent(conn);
1920                 }
1921             } else if (connectPromise != null && Quiche.quiche_conn_is_established(conn.address())) {
1922                 ChannelPromise promise = connectPromise;
1923                 connectPromise = null;
1924                 state = ChannelState.ACTIVE;
1925 
1926                 boolean promiseSet = promise.trySuccess();
1927                 pipeline().fireChannelActive();
1928                 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1929                 fireDatagramExtensionEvent(conn);
1930                 if (!promiseSet) {
1931                     fireConnectCloseEventIfNeeded(conn);
1932                     this.close(this.voidPromise());
1933                     return true;
1934                 }
1935             }
1936             return false;
1937         }
1938 
1939         private void fireDatagramExtensionEvent(QuicheQuicConnection conn) {
1940             if (conn.isClosed()) {
1941                 return;
1942             }
1943             long connAddr = conn.address();
1944             int len = Quiche.quiche_conn_dgram_max_writable_len(connAddr);
1945             // QUICHE_ERR_DONE means the remote peer does not support the extension.
1946             if (len != Quiche.QUICHE_ERR_DONE) {
1947                 pipeline().fireUserEventTriggered(new QuicDatagramExtensionEvent(len));
1948             }
1949         }
1950 
1951         private QuicheQuicStreamChannel addNewStreamChannel(long streamId) {
1952             QuicheQuicStreamChannel streamChannel = new QuicheQuicStreamChannel(
1953                     QuicheQuicChannel.this, streamId);
1954             QuicheQuicStreamChannel old = streams.put(streamId, streamChannel);
1955             assert old == null;
1956             streamChannel.writable(streamCapacity(streamId));
1957             return streamChannel;
1958         }
1959     }
1960 
1961     /**
1962      * Finish the connect operation of a client channel.
1963      */
1964     void finishConnect() {
1965         assert !server;
1966         assert connection != null;
1967         if (connectionSend(connection) != SendResult.NONE) {
1968             flushParent();
1969         }
1970     }
1971 
1972     private void notifyEarlyDataReadyIfNeeded(QuicheQuicConnection conn) {
1973         if (!server && !earlyDataReadyNotified &&
1974                 !conn.isFreed() && Quiche.quiche_conn_is_in_early_data(conn.address())) {
1975             earlyDataReadyNotified = true;
1976             pipeline().fireUserEventTriggered(SslEarlyDataReadyEvent.INSTANCE);
1977         }
1978     }
1979 
1980     private final class TimeoutHandler implements Runnable {
1981         private ScheduledFuture<?> timeoutFuture;
1982 
1983         @Override
1984         public void run() {
1985             QuicheQuicConnection conn = connection;
1986             if (conn.isFreed()) {
1987                 return;
1988             }
1989             if (!freeIfClosed()) {
1990                 long connAddr = conn.address();
1991                 timeoutFuture = null;
1992                 // Notify quiche there was a timeout.
1993                 Quiche.quiche_conn_on_timeout(connAddr);
1994                 if (!freeIfClosed()) {
1995                     // We need to call connectionSend when a timeout was triggered.
1996                     // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.send.
1997                     if (connectionSend(conn) != SendResult.NONE) {
1998                         flushParent();
1999                     }
2000                     boolean closed = freeIfClosed();
2001                     if (!closed) {
2002                         // The connection is alive, reschedule.
2003                         scheduleTimeout();
2004                     }
2005                 }
2006             }
2007         }
2008 
2009         // Schedule timeout.
2010         // See https://docs.rs/quiche/0.6.0/quiche/#generating-outgoing-packets
2011         void scheduleTimeout() {
2012             QuicheQuicConnection conn = connection;
2013             if (conn.isFreed()) {
2014                 cancel();
2015                 return;
2016             }
2017             if (conn.isClosed()) {
2018                 cancel();
2019                 unsafe().close(newPromise());
2020                 return;
2021             }
2022             long nanos = Quiche.quiche_conn_timeout_as_nanos(conn.address());
2023             if (nanos < 0 || nanos == Long.MAX_VALUE) {
2024                 // No timeout needed.
2025                 cancel();
2026                 return;
2027             }
2028             if (timeoutFuture == null) {
2029                 timeoutFuture = eventLoop().schedule(this,
2030                         nanos, TimeUnit.NANOSECONDS);
2031             } else {
2032                 long remaining = timeoutFuture.getDelay(TimeUnit.NANOSECONDS);
2033                 if (remaining <= 0) {
2034                     // This means the timer already elapsed. In this case just cancel the future and call run()
2035                     // directly. This will ensure we correctly call quiche_conn_on_timeout() etc.
2036                     cancel();
2037                     run();
2038                 } else if (remaining > nanos) {
2039                     // The new timeout is smaller then what was scheduled before. Let's cancel the old timeout
2040                     // and schedule a new one.
2041                     cancel();
2042                     timeoutFuture = eventLoop().schedule(this, nanos, TimeUnit.NANOSECONDS);
2043                 }
2044             }
2045         }
2046 
2047         void cancel() {
2048             if (timeoutFuture != null) {
2049                 timeoutFuture.cancel(false);
2050                 timeoutFuture = null;
2051             }
2052         }
2053     }
2054 
2055     @Override
2056     public Future<QuicConnectionStats> collectStats(Promise<QuicConnectionStats> promise) {
2057         if (eventLoop().inEventLoop()) {
2058             collectStats0(promise);
2059         } else {
2060             eventLoop().execute(() -> collectStats0(promise));
2061         }
2062         return promise;
2063     }
2064 
2065     private void collectStats0(Promise<QuicConnectionStats> promise) {
2066         QuicheQuicConnection conn = connection;
2067         if (conn.isFreed()) {
2068             promise.setSuccess(statsAtClose);
2069             return;
2070         }
2071 
2072         collectStats0(connection, promise);
2073     }
2074 
2075     @Nullable
2076     private QuicConnectionStats collectStats0(QuicheQuicConnection connection, Promise<QuicConnectionStats> promise) {
2077         final long[] stats = Quiche.quiche_conn_stats(connection.address());
2078         if (stats == null) {
2079             promise.setFailure(new IllegalStateException("native quiche_conn_stats(...) failed"));
2080             return null;
2081         }
2082 
2083         final QuicheQuicConnectionStats connStats =
2084             new QuicheQuicConnectionStats(stats);
2085         promise.setSuccess(connStats);
2086         return connStats;
2087     }
2088 
2089     @Override
2090     public Future<QuicConnectionPathStats> collectPathStats(int pathIdx, Promise<QuicConnectionPathStats> promise) {
2091         if (eventLoop().inEventLoop()) {
2092             collectPathStats0(pathIdx, promise);
2093         } else {
2094             eventLoop().execute(() -> collectPathStats0(pathIdx, promise));
2095         }
2096         return promise;
2097     }
2098 
2099     private void collectPathStats0(int pathIdx, Promise<QuicConnectionPathStats> promise) {
2100         QuicheQuicConnection conn = connection;
2101         if (conn.isFreed()) {
2102             promise.setFailure(new IllegalStateException("Connection is closed"));
2103             return;
2104         }
2105 
2106         final Object[] stats = Quiche.quiche_conn_path_stats(connection.address(), pathIdx);
2107         if (stats == null) {
2108             promise.setFailure(new IllegalStateException("native quiche_conn_path_stats(...) failed"));
2109             return;
2110         }
2111         promise.setSuccess(new QuicheQuicConnectionPathStats(stats));
2112     }
2113 
2114     @Override
2115     public QuicTransportParameters peerTransportParameters() {
2116         return connection.peerParameters();
2117     }
2118 }