View Javadoc
1   /*
2    * Copyright 2020 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.quic;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.AbstractChannel;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandler;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelMetadata;
27  import io.netty.channel.ChannelOption;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPipeline;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.DefaultChannelPipeline;
33  import io.netty.channel.EventLoop;
34  import io.netty.channel.RecvByteBufAllocator;
35  import io.netty.channel.socket.DatagramPacket;
36  import io.netty.handler.ssl.SniCompletionEvent;
37  import io.netty.handler.ssl.SslHandshakeCompletionEvent;
38  import io.netty.util.AttributeKey;
39  import io.netty.util.collection.LongObjectHashMap;
40  import io.netty.util.collection.LongObjectMap;
41  import io.netty.util.concurrent.Future;
42  import io.netty.util.concurrent.ImmediateEventExecutor;
43  import io.netty.util.concurrent.ImmediateExecutor;
44  import io.netty.util.concurrent.Promise;
45  import io.netty.util.internal.StringUtil;
46  import io.netty.util.internal.logging.InternalLogger;
47  import io.netty.util.internal.logging.InternalLoggerFactory;
48  import org.jetbrains.annotations.Nullable;
49  
50  import javax.net.ssl.SSLEngine;
51  import javax.net.ssl.SSLHandshakeException;
52  import java.io.File;
53  import java.net.ConnectException;
54  import java.net.InetSocketAddress;
55  import java.net.SocketAddress;
56  import java.nio.BufferUnderflowException;
57  import java.nio.ByteBuffer;
58  import java.nio.channels.AlreadyConnectedException;
59  import java.nio.channels.ClosedChannelException;
60  import java.nio.channels.ConnectionPendingException;
61  import java.util.ArrayList;
62  import java.util.Collections;
63  import java.util.HashSet;
64  import java.util.List;
65  import java.util.Map;
66  import java.util.Set;
67  import java.util.concurrent.Executor;
68  import java.util.concurrent.ScheduledFuture;
69  import java.util.concurrent.TimeUnit;
70  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
71  import java.util.function.Consumer;
72  import java.util.function.Function;
73  
74  /**
75   * {@link QuicChannel} implementation that uses <a href="https://github.com/cloudflare/quiche">quiche</a>.
76   */
77  final class QuicheQuicChannel extends AbstractChannel implements QuicChannel {
78      private static final InternalLogger logger = InternalLoggerFactory.getInstance(QuicheQuicChannel.class);
79      private static final String QLOG_FILE_EXTENSION = ".qlog";
80  
81      enum StreamRecvResult {
82          /**
83           * Nothing more to read from the stream.
84           */
85          DONE,
86          /**
87           * FIN flag received.
88           */
89          FIN,
90          /**
91           * Normal read without FIN flag.
92           */
93          OK
94      }
95  
96      private enum ChannelState {
97          OPEN,
98          ACTIVE,
99          CLOSED
100     }
101 
102     private enum SendResult {
103         SOME,
104         NONE,
105         CLOSE
106     }
107 
108     private static final class CloseData implements ChannelFutureListener {
109         final boolean applicationClose;
110         final int err;
111         final ByteBuf reason;
112 
113         CloseData(boolean applicationClose, int err, ByteBuf reason) {
114             this.applicationClose = applicationClose;
115             this.err = err;
116             this.reason = reason;
117         }
118 
119         @Override
120         public void operationComplete(ChannelFuture future) {
121             reason.release();
122         }
123     }
124 
125     private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
126     private long[] readableStreams = new long[4];
127     private long[] writableStreams = new long[4];
128     private final LongObjectMap<QuicheQuicStreamChannel> streams = new LongObjectHashMap<>();
129     private final QuicheQuicChannelConfig config;
130     private final boolean server;
131     private final QuicStreamIdGenerator idGenerator;
132     private final ChannelHandler streamHandler;
133     private final Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray;
134     private final Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray;
135     private final TimeoutHandler timeoutHandler;
136     private final QuicConnectionIdGenerator connectionIdAddressGenerator;
137     private final QuicResetTokenGenerator resetTokenGenerator;
138     private final Set<ByteBuffer> sourceConnectionIds = new HashSet<>();
139 
140     private Consumer<QuicheQuicChannel> freeTask;
141     private Executor sslTaskExecutor;
142     private boolean inFireChannelReadCompleteQueue;
143     private boolean fireChannelReadCompletePending;
144     private ByteBuf finBuffer;
145     private ByteBuf outErrorCodeBuffer;
146     private ChannelPromise connectPromise;
147     private ScheduledFuture<?> connectTimeoutFuture;
148     private QuicConnectionAddress connectAddress;
149     private CloseData closeData;
150     private QuicConnectionCloseEvent connectionCloseEvent;
151     private QuicConnectionStats statsAtClose;
152     private boolean supportsDatagram;
153     private boolean recvDatagramPending;
154     private boolean datagramReadable;
155     private boolean recvStreamPending;
156     private boolean streamReadable;
157     private boolean handshakeCompletionNotified;
158     private boolean earlyDataReadyNotified;
159     private int reantranceGuard;
160     private static final int IN_RECV = 1 << 1;
161     private static final int IN_CONNECTION_SEND = 1 << 2;
162     private static final int IN_HANDLE_WRITABLE_STREAMS = 1 << 3;
163     private volatile ChannelState state = ChannelState.OPEN;
164     private volatile boolean timedOut;
165     private volatile String traceId;
166     private volatile QuicheQuicConnection connection;
167     private volatile InetSocketAddress local;
168     private volatile InetSocketAddress remote;
169 
170     private final ChannelFutureListener continueSendingListener = f -> {
171         if (connectionSend(connection) != SendResult.NONE) {
172             flushParent();
173         }
174     };
175 
176     private static final AtomicLongFieldUpdater<QuicheQuicChannel> UNI_STREAMS_LEFT_UPDATER =
177             AtomicLongFieldUpdater.newUpdater(QuicheQuicChannel.class, "uniStreamsLeft");
178     private volatile long uniStreamsLeft;
179 
180     private static final AtomicLongFieldUpdater<QuicheQuicChannel> BIDI_STREAMS_LEFT_UPDATER =
181             AtomicLongFieldUpdater.newUpdater(QuicheQuicChannel.class, "bidiStreamsLeft");
182     private volatile long bidiStreamsLeft;
183 
184     private QuicheQuicChannel(Channel parent, boolean server, @Nullable ByteBuffer key, InetSocketAddress local,
185                               InetSocketAddress remote, boolean supportsDatagram, ChannelHandler streamHandler,
186                               Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
187                               Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray,
188                               @Nullable Consumer<QuicheQuicChannel> freeTask,
189                               @Nullable Executor sslTaskExecutor,
190                               @Nullable QuicConnectionIdGenerator connectionIdAddressGenerator,
191                               @Nullable QuicResetTokenGenerator resetTokenGenerator) {
192         super(parent);
193         config = new QuicheQuicChannelConfig(this);
194         this.freeTask = freeTask;
195         this.server = server;
196         this.idGenerator = new QuicStreamIdGenerator(server);
197         this.connectionIdAddressGenerator = connectionIdAddressGenerator;
198         this.resetTokenGenerator = resetTokenGenerator;
199         if (key != null) {
200             this.sourceConnectionIds.add(key);
201         }
202 
203         this.supportsDatagram = supportsDatagram;
204         this.local = local;
205         this.remote = remote;
206 
207         this.streamHandler = streamHandler;
208         this.streamOptionsArray = streamOptionsArray;
209         this.streamAttrsArray = streamAttrsArray;
210         timeoutHandler = new TimeoutHandler();
211         this.sslTaskExecutor = sslTaskExecutor == null ? ImmediateExecutor.INSTANCE : sslTaskExecutor;
212     }
213 
214     static QuicheQuicChannel forClient(Channel parent, InetSocketAddress local, InetSocketAddress remote,
215                                        ChannelHandler streamHandler,
216                                        Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
217                                        Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray) {
218         return new QuicheQuicChannel(parent, false, null, local, remote, false, streamHandler,
219                 streamOptionsArray, streamAttrsArray, null, null, null, null);
220     }
221 
222     static QuicheQuicChannel forServer(Channel parent, ByteBuffer key, InetSocketAddress local,
223                                        InetSocketAddress remote,
224                                        boolean supportsDatagram, ChannelHandler streamHandler,
225                                        Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
226                                        Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray,
227                                        Consumer<QuicheQuicChannel> freeTask, Executor sslTaskExecutor,
228                                        QuicConnectionIdGenerator connectionIdAddressGenerator,
229                                        QuicResetTokenGenerator resetTokenGenerator) {
230         return new QuicheQuicChannel(parent, true, key, local, remote, supportsDatagram,
231                 streamHandler, streamOptionsArray, streamAttrsArray, freeTask,
232                 sslTaskExecutor, connectionIdAddressGenerator, resetTokenGenerator);
233     }
234 
235     private static final int MAX_ARRAY_LEN = 128;
236 
237     private static long[] growIfNeeded(long[] array, int maxLength) {
238         if (maxLength > array.length) {
239             if (array.length == MAX_ARRAY_LEN) {
240                 return array;
241             }
242             // Increase by 4 until we reach MAX_ARRAY_LEN
243             return new long[Math.min(MAX_ARRAY_LEN, array.length + 4)];
244         }
245         return array;
246     }
247 
248     @Override
249     public boolean isTimedOut() {
250         return timedOut;
251     }
252 
253     @Override
254     public SSLEngine sslEngine() {
255         QuicheQuicConnection connection = this.connection;
256         return connection == null ? null : connection.engine();
257     }
258 
259     private void notifyAboutHandshakeCompletionIfNeeded(QuicheQuicConnection conn,
260                                                         @Nullable SSLHandshakeException cause) {
261         if (handshakeCompletionNotified) {
262             return;
263         }
264         if (cause != null) {
265             pipeline().fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
266             return;
267         }
268         if (conn.isFreed()) {
269             return;
270         }
271         switch (connection.engine().getHandshakeStatus()) {
272             case NOT_HANDSHAKING:
273             case FINISHED:
274                 handshakeCompletionNotified = true;
275                 pipeline().fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
276                 break;
277             default:
278                 break;
279         }
280     }
281 
282     @Override
283     public long peerAllowedStreams(QuicStreamType type) {
284         switch (type) {
285             case BIDIRECTIONAL:
286                 return bidiStreamsLeft;
287             case UNIDIRECTIONAL:
288                 return uniStreamsLeft;
289             default:
290                 return 0;
291         }
292     }
293 
294     void attachQuicheConnection(QuicheQuicConnection connection) {
295         this.connection = connection;
296 
297         byte[] traceId = Quiche.quiche_conn_trace_id(connection.address());
298         if (traceId != null) {
299             this.traceId = new String(traceId);
300         }
301 
302         connection.init(local, remote,
303                 sniHostname -> pipeline().fireUserEventTriggered(new SniCompletionEvent(sniHostname)));
304 
305         // Setup QLOG if needed.
306         QLogConfiguration configuration = config.getQLogConfiguration();
307         if (configuration != null) {
308             final String fileName;
309             File file = new File(configuration.path());
310             if (file.isDirectory()) {
311                 // Create directory if needed.
312                 file.mkdir();
313                 if (this.traceId != null) {
314                     fileName = configuration.path() + File.separatorChar + this.traceId + "-" +
315                             id().asShortText() + QLOG_FILE_EXTENSION;
316                 } else {
317                     fileName = configuration.path() + File.separatorChar + id().asShortText() + QLOG_FILE_EXTENSION;
318                 }
319             } else {
320                 fileName = configuration.path();
321             }
322 
323             if (!Quiche.quiche_conn_set_qlog_path(connection.address(), fileName,
324                     configuration.logTitle(), configuration.logDescription())) {
325                 logger.info("Unable to create qlog file: {} ", fileName);
326             }
327         }
328     }
329 
330     void connectNow(Function<QuicChannel, ? extends QuicSslEngine> engineProvider, Executor sslTaskExecutor,
331                     Consumer<QuicheQuicChannel> freeTask, long configAddr, int localConnIdLength,
332                     boolean supportsDatagram, ByteBuffer fromSockaddrMemory, ByteBuffer toSockaddrMemory)
333             throws Exception {
334         assert this.connection == null;
335         assert this.traceId == null;
336         assert this.sourceConnectionIds.isEmpty();
337 
338         this.sslTaskExecutor = sslTaskExecutor;
339         this.freeTask = freeTask;
340 
341         QuicConnectionAddress address = this.connectAddress;
342 
343         if (address == QuicConnectionAddress.EPHEMERAL) {
344             address = QuicConnectionAddress.random(localConnIdLength);
345         }
346         ByteBuffer connectId = address.id();
347         if (connectId.remaining() != localConnIdLength) {
348             failConnectPromiseAndThrow(new IllegalArgumentException("connectionAddress has length "
349                     + connectId.remaining()
350                     + " instead of " + localConnIdLength));
351         }
352         QuicSslEngine engine = engineProvider.apply(this);
353         if (!(engine instanceof QuicheQuicSslEngine)) {
354             failConnectPromiseAndThrow(new IllegalArgumentException("QuicSslEngine is not of type "
355                     + QuicheQuicSslEngine.class.getSimpleName()));
356             return;
357         }
358         if (!engine.getUseClientMode()) {
359             failConnectPromiseAndThrow(new IllegalArgumentException("QuicSslEngine is not create in client mode"));
360         }
361         QuicheQuicSslEngine quicheEngine = (QuicheQuicSslEngine) engine;
362         ByteBuf idBuffer = alloc().directBuffer(connectId.remaining()).writeBytes(connectId.duplicate());
363         try {
364             int fromSockaddrLen = SockaddrIn.setAddress(fromSockaddrMemory, local);
365             int toSockaddrLen = SockaddrIn.setAddress(toSockaddrMemory, remote);
366             QuicheQuicConnection connection = quicheEngine.createConnection(ssl ->
367                     Quiche.quiche_conn_new_with_tls(Quiche.readerMemoryAddress(idBuffer),
368                             idBuffer.readableBytes(), -1, -1,
369                             Quiche.memoryAddressWithPosition(fromSockaddrMemory), fromSockaddrLen,
370                             Quiche.memoryAddressWithPosition(toSockaddrMemory), toSockaddrLen,
371                             configAddr, ssl, false));
372             if (connection == null) {
373                 failConnectPromiseAndThrow(new ConnectException());
374                 return;
375             }
376             attachQuicheConnection(connection);
377             QuicClientSessionCache sessionCache = quicheEngine.ctx.getSessionCache();
378             if (sessionCache != null) {
379                 byte[] sessionBytes = sessionCache
380                         .getSession(quicheEngine.getSession().getPeerHost(), quicheEngine.getSession().getPeerPort());
381                 if (sessionBytes != null) {
382                     Quiche.quiche_conn_set_session(connection.address(), sessionBytes);
383                 }
384             }
385             this.supportsDatagram = supportsDatagram;
386             sourceConnectionIds.add(connectId);
387         } finally {
388             idBuffer.release();
389         }
390     }
391 
392     private void failConnectPromiseAndThrow(Exception e) throws Exception {
393         tryFailConnectPromise(e);
394         throw e;
395     }
396 
397     private boolean tryFailConnectPromise(Exception e) {
398         ChannelPromise promise = connectPromise;
399         if (promise != null) {
400             connectPromise = null;
401             promise.tryFailure(e);
402             return true;
403         }
404         return false;
405     }
406 
407     Set<ByteBuffer> sourceConnectionIds() {
408         return sourceConnectionIds;
409     }
410 
411     boolean markInFireChannelReadCompleteQueue() {
412         if (inFireChannelReadCompleteQueue) {
413             return false;
414         }
415         inFireChannelReadCompleteQueue = true;
416         return true;
417     }
418 
419     private void failPendingConnectPromise() {
420         ChannelPromise promise = QuicheQuicChannel.this.connectPromise;
421         if (promise != null) {
422             QuicheQuicChannel.this.connectPromise = null;
423             promise.tryFailure(new QuicClosedChannelException(this.connectionCloseEvent));
424         }
425     }
426 
427     void forceClose() {
428         unsafe().close(voidPromise());
429     }
430 
431     @Override
432     protected DefaultChannelPipeline newChannelPipeline() {
433         return new DefaultChannelPipeline(this) {
434             @Override
435             protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
436                 if (msg instanceof QuicStreamChannel) {
437                     QuicStreamChannel channel = (QuicStreamChannel) msg;
438                     Quic.setupChannel(channel, streamOptionsArray, streamAttrsArray, streamHandler, logger);
439                     ctx.channel().eventLoop().register(channel);
440                 } else {
441                     super.onUnhandledInboundMessage(ctx, msg);
442                 }
443             }
444         };
445     }
446 
447     @Override
448     public QuicChannel flush() {
449         super.flush();
450         return this;
451     }
452 
453     @Override
454     public QuicChannel read() {
455         super.read();
456         return this;
457     }
458 
459     @Override
460     public Future<QuicStreamChannel> createStream(QuicStreamType type, @Nullable ChannelHandler handler,
461                                                   Promise<QuicStreamChannel> promise) {
462         if (eventLoop().inEventLoop()) {
463             ((QuicChannelUnsafe) unsafe()).connectStream(type, handler, promise);
464         } else {
465             eventLoop().execute(() -> ((QuicChannelUnsafe) unsafe()).connectStream(type, handler, promise));
466         }
467         return promise;
468     }
469 
470     @Override
471     public ChannelFuture close(boolean applicationClose, int error, ByteBuf reason, ChannelPromise promise) {
472         if (eventLoop().inEventLoop()) {
473             close0(applicationClose, error, reason, promise);
474         } else {
475             eventLoop().execute(() -> close0(applicationClose, error, reason, promise));
476         }
477         return promise;
478     }
479 
480     private void close0(boolean applicationClose, int error, ByteBuf reason, ChannelPromise promise) {
481         if (closeData == null) {
482             if (!reason.hasMemoryAddress()) {
483                 // Copy to direct buffer as that's what we need.
484                 ByteBuf copy = alloc().directBuffer(reason.readableBytes()).writeBytes(reason);
485                 reason.release();
486                 reason = copy;
487             }
488             closeData = new CloseData(applicationClose, error, reason);
489             promise.addListener(closeData);
490         } else {
491             // We already have a close scheduled that uses a close data. Lets release the buffer early.
492             reason.release();
493         }
494         close(promise);
495     }
496 
497     @Override
498     public String toString() {
499         String traceId = this.traceId;
500         if (traceId == null) {
501             return "()" + super.toString();
502         } else {
503             return '(' + traceId + ')' + super.toString();
504         }
505     }
506 
507     @Override
508     protected AbstractUnsafe newUnsafe() {
509         return new QuicChannelUnsafe();
510     }
511 
512     @Override
513     protected boolean isCompatible(EventLoop eventLoop) {
514         return parent().eventLoop() == eventLoop;
515     }
516 
517     @Override
518     @Nullable
519     protected QuicConnectionAddress localAddress0() {
520         QuicheQuicConnection connection = this.connection;
521         return connection == null ? null : connection.sourceId();
522     }
523 
524     @Override
525     @Nullable
526     protected QuicConnectionAddress remoteAddress0() {
527         QuicheQuicConnection connection = this.connection;
528         return connection == null ? null : connection.destinationId();
529     }
530 
531     @Override
532     @Nullable
533     public QuicConnectionAddress localAddress() {
534         // Override so we never cache as the sourceId() can change over life-time.
535         return localAddress0();
536     }
537 
538     @Override
539     @Nullable
540     public QuicConnectionAddress remoteAddress() {
541         // Override so we never cache as the destinationId() can change over life-time.
542         return remoteAddress0();
543     }
544 
545     @Override
546     @Nullable
547     public SocketAddress localSocketAddress() {
548         return local;
549     }
550 
551     @Override
552     @Nullable
553     public SocketAddress remoteSocketAddress() {
554         return remote;
555     }
556 
557     @Override
558     protected void doBind(SocketAddress socketAddress) {
559         throw new UnsupportedOperationException();
560     }
561 
562     @Override
563     protected void doDisconnect() throws Exception {
564         doClose();
565     }
566 
567     @Override
568     protected void doClose() throws Exception {
569         if (state == ChannelState.CLOSED) {
570             return;
571         }
572         state = ChannelState.CLOSED;
573 
574         QuicheQuicConnection conn = this.connection;
575         if (conn == null || conn.isFreed()) {
576             if (closeData != null) {
577                 closeData.reason.release();
578                 closeData = null;
579             }
580             failPendingConnectPromise();
581             return;
582         }
583 
584         // Call connectionSend() so we ensure we send all that is queued before we close the channel
585         SendResult sendResult = connectionSend(conn);
586 
587         final boolean app;
588         final int err;
589         final ByteBuf reason;
590         if (closeData == null) {
591             app = false;
592             err = 0;
593             reason = Unpooled.EMPTY_BUFFER;
594         } else {
595             app = closeData.applicationClose;
596             err = closeData.err;
597             reason = closeData.reason;
598             closeData = null;
599         }
600 
601         failPendingConnectPromise();
602         try {
603             int res = Quiche.quiche_conn_close(conn.address(), app, err,
604                     Quiche.readerMemoryAddress(reason), reason.readableBytes());
605             if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
606                 throw Quiche.convertToException(res);
607             }
608             // As we called quiche_conn_close(...) we need to ensure we will call quiche_conn_send(...) either
609             // now or we will do so once we see the channelReadComplete event.
610             //
611             // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.close
612             if (connectionSend(conn) == SendResult.SOME) {
613                 sendResult = SendResult.SOME;
614             }
615         } finally {
616 
617             // making sure that connection statistics is available
618             // even after channel is closed
619             statsAtClose = collectStats0(conn, eventLoop().newPromise());
620             try {
621                 timedOut = Quiche.quiche_conn_is_timed_out(conn.address());
622 
623                 closeStreams();
624                 if (finBuffer != null) {
625                     finBuffer.release();
626                     finBuffer = null;
627                 }
628                 if (outErrorCodeBuffer != null) {
629                     outErrorCodeBuffer.release();
630                     outErrorCodeBuffer = null;
631                 }
632             } finally {
633                 if (sendResult == SendResult.SOME) {
634                     // As this is the close let us flush it asap.
635                     forceFlushParent();
636                 } else {
637                     flushParent();
638                 }
639                 conn.free();
640                 if (freeTask != null) {
641                     freeTask.accept(this);
642                 }
643                 timeoutHandler.cancel();
644 
645                 local = null;
646                 remote = null;
647             }
648         }
649     }
650 
651     @Override
652     protected void doBeginRead() {
653         recvDatagramPending = true;
654         recvStreamPending = true;
655         if (datagramReadable || streamReadable) {
656             ((QuicChannelUnsafe) unsafe()).recv();
657         }
658     }
659 
660     @Override
661     protected Object filterOutboundMessage(Object msg) {
662         if (msg instanceof ByteBuf) {
663             return msg;
664         }
665         throw new UnsupportedOperationException("Unsupported message type: " + StringUtil.simpleClassName(msg));
666     }
667 
668     @Override
669     protected void doWrite(ChannelOutboundBuffer channelOutboundBuffer) throws Exception {
670         if (!supportsDatagram) {
671             throw new UnsupportedOperationException("Datagram extension is not supported");
672         }
673         boolean sendSomething = false;
674         boolean retry = false;
675         QuicheQuicConnection conn = connection;
676         try {
677             for (;;) {
678                 ByteBuf buffer = (ByteBuf) channelOutboundBuffer.current();
679                 if (buffer == null) {
680                     break;
681                 }
682 
683                 int readable = buffer.readableBytes();
684                 if (readable == 0) {
685                     // Skip empty buffers.
686                     channelOutboundBuffer.remove();
687                     continue;
688                 }
689 
690                 final int res;
691                 if (!buffer.isDirect() || buffer.nioBufferCount() > 1) {
692                     ByteBuf tmpBuffer = alloc().directBuffer(readable);
693                     try {
694                         tmpBuffer.writeBytes(buffer, buffer.readerIndex(), readable);
695                         res = sendDatagram(conn, tmpBuffer);
696                     } finally {
697                         tmpBuffer.release();
698                     }
699                 } else {
700                     res = sendDatagram(conn, buffer);
701                 }
702                 if (res >= 0) {
703                     channelOutboundBuffer.remove();
704                     sendSomething = true;
705                     retry = false;
706                 } else {
707                     if (res == Quiche.QUICHE_ERR_BUFFER_TOO_SHORT) {
708                         retry = false;
709                         channelOutboundBuffer.remove(new BufferUnderflowException());
710                     } else if (res == Quiche.QUICHE_ERR_INVALID_STATE) {
711                         throw new UnsupportedOperationException("Remote peer does not support Datagram extension");
712                     } else if (res == Quiche.QUICHE_ERR_DONE) {
713                         if (retry) {
714                             // We already retried and it didn't work. Let's drop the datagrams on the floor.
715                             for (;;) {
716                                 if (!channelOutboundBuffer.remove()) {
717                                     // The buffer is empty now.
718                                     return;
719                                 }
720                             }
721                         }
722                         // Set sendSomething to false a we will call connectionSend() now.
723                         sendSomething = false;
724                         // If this returned DONE we couldn't write anymore. This happens if the internal queue
725                         // is full. In this case we should call quiche_conn_send(...) and so make space again.
726                         if (connectionSend(conn) != SendResult.NONE) {
727                             forceFlushParent();
728                         }
729                         // Let's try again to write the message.
730                         retry = true;
731                     } else {
732                         throw Quiche.convertToException(res);
733                     }
734                 }
735             }
736         } finally {
737             if (sendSomething && connectionSend(conn) != SendResult.NONE) {
738                 flushParent();
739             }
740         }
741     }
742 
743     private static int sendDatagram(QuicheQuicConnection conn, ByteBuf buf) throws ClosedChannelException {
744         return Quiche.quiche_conn_dgram_send(connectionAddressChecked(conn),
745                 Quiche.readerMemoryAddress(buf), buf.readableBytes());
746     }
747 
748     @Override
749     public QuicChannelConfig config() {
750         return config;
751     }
752 
753     @Override
754     public boolean isOpen() {
755         return state != ChannelState.CLOSED;
756     }
757 
758     @Override
759     public boolean isActive() {
760         return state == ChannelState.ACTIVE;
761     }
762 
763     @Override
764     public ChannelMetadata metadata() {
765         return METADATA;
766     }
767 
768     /**
769      * This may call {@link #flush()} on the parent channel if needed. The flush may be delayed until the read loop
770      * is over.
771      */
772     private void flushParent() {
773         if (!inFireChannelReadCompleteQueue) {
774             forceFlushParent();
775         }
776     }
777 
778     /**
779      * Call {@link #flush()} on the parent channel.
780      */
781     private void forceFlushParent() {
782         parent().flush();
783     }
784 
785     private static long connectionAddressChecked(@Nullable QuicheQuicConnection conn) throws ClosedChannelException {
786         if (conn == null || conn.isFreed()) {
787             throw new ClosedChannelException();
788         }
789         return conn.address();
790     }
791 
792     boolean freeIfClosed() {
793         QuicheQuicConnection conn = connection;
794         if (conn == null || conn.isFreed()) {
795             return true;
796         }
797         if (conn.isClosed()) {
798             unsafe().close(newPromise());
799             return true;
800         }
801         return false;
802     }
803 
804     private void closeStreams() {
805         if (streams.isEmpty()) {
806             return;
807         }
808         final ClosedChannelException closedChannelException;
809         if (isTimedOut()) {
810             // Close the streams because of a timeout.
811             closedChannelException = new QuicTimeoutClosedChannelException();
812         } else {
813             closedChannelException = new ClosedChannelException();
814         }
815         // Make a copy to ensure we not run into a situation when we change the underlying iterator from
816         // another method and so run in an assert error.
817         for (QuicheQuicStreamChannel stream: streams.values().toArray(new QuicheQuicStreamChannel[0])) {
818             stream.unsafe().close(closedChannelException, voidPromise());
819         }
820         streams.clear();
821     }
822 
823     void streamPriority(long streamId, byte priority, boolean incremental) throws Exception {
824        int res = Quiche.quiche_conn_stream_priority(connectionAddressChecked(connection), streamId,
825                priority, incremental);
826        if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
827            throw Quiche.convertToException(res);
828        }
829     }
830 
831     void streamClosed(long streamId) {
832         streams.remove(streamId);
833     }
834 
835     boolean isStreamLocalCreated(long streamId) {
836         return (streamId & 0x1) == (server ? 1 : 0);
837     }
838 
839     QuicStreamType streamType(long streamId) {
840         return (streamId & 0x2) == 0 ? QuicStreamType.BIDIRECTIONAL : QuicStreamType.UNIDIRECTIONAL;
841     }
842 
843     void streamShutdown(long streamId, boolean read, boolean write, int err, ChannelPromise promise) {
844         QuicheQuicConnection conn = this.connection;
845         final long connectionAddress;
846         try {
847             connectionAddress = connectionAddressChecked(conn);
848         } catch (ClosedChannelException e) {
849             promise.setFailure(e);
850             return;
851         }
852         int res = 0;
853         if (read) {
854             res |= Quiche.quiche_conn_stream_shutdown(connectionAddress, streamId, Quiche.QUICHE_SHUTDOWN_READ, err);
855         }
856         if (write) {
857             res |= Quiche.quiche_conn_stream_shutdown(connectionAddress, streamId, Quiche.QUICHE_SHUTDOWN_WRITE, err);
858         }
859 
860         // As we called quiche_conn_stream_shutdown(...) we need to ensure we will call quiche_conn_send(...) either
861         // now or we will do so once we see the channelReadComplete event.
862         //
863         // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.send
864         if (connectionSend(conn) != SendResult.NONE) {
865             // Force the flush so the shutdown can be seen asap.
866             forceFlushParent();
867         }
868         if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
869             promise.setFailure(Quiche.convertToException(res));
870         } else {
871             promise.setSuccess();
872         }
873     }
874 
875     void streamSendFin(long streamId) throws Exception {
876         QuicheQuicConnection conn = connection;
877         try {
878             // Just write an empty buffer and set fin to true.
879             int res = streamSend0(conn, streamId, Unpooled.EMPTY_BUFFER, true);
880             if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
881                 throw Quiche.convertToException(res);
882             }
883         } finally {
884             // As we called quiche_conn_stream_send(...) we need to ensure we will call quiche_conn_send(...) either
885             // now or we will do so once we see the channelReadComplete event.
886             //
887             // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.send
888             if (connectionSend(conn) != SendResult.NONE) {
889                 flushParent();
890             }
891         }
892     }
893 
894     int streamSend(long streamId, ByteBuf buffer, boolean fin) throws ClosedChannelException {
895         QuicheQuicConnection conn = connection;
896         if (buffer.nioBufferCount() == 1) {
897             return streamSend0(conn, streamId, buffer, fin);
898         }
899         ByteBuffer[] nioBuffers  = buffer.nioBuffers();
900         int lastIdx = nioBuffers.length - 1;
901         int res = 0;
902         for (int i = 0; i < lastIdx; i++) {
903             ByteBuffer nioBuffer = nioBuffers[i];
904             while (nioBuffer.hasRemaining()) {
905                 int localRes = streamSend(conn, streamId, nioBuffer, false);
906                 if (localRes <= 0) {
907                     return res;
908                 }
909                 res += localRes;
910 
911                 nioBuffer.position(nioBuffer.position() + localRes);
912             }
913         }
914         int localRes = streamSend(conn, streamId, nioBuffers[lastIdx], fin);
915         if (localRes > 0) {
916             res += localRes;
917         }
918         return res;
919     }
920 
921     void connectionSendAndFlush() {
922         if (inFireChannelReadCompleteQueue || (reantranceGuard & IN_HANDLE_WRITABLE_STREAMS) != 0) {
923             return;
924         }
925         if (connectionSend(connection) != SendResult.NONE) {
926             flushParent();
927         }
928     }
929 
930     private int streamSend0(QuicheQuicConnection conn, long streamId, ByteBuf buffer, boolean fin)
931             throws ClosedChannelException {
932         return Quiche.quiche_conn_stream_send(connectionAddressChecked(conn), streamId,
933                 Quiche.readerMemoryAddress(buffer), buffer.readableBytes(), fin);
934     }
935 
936     private int streamSend(QuicheQuicConnection conn, long streamId, ByteBuffer buffer, boolean fin)
937             throws ClosedChannelException {
938         return Quiche.quiche_conn_stream_send(connectionAddressChecked(conn), streamId,
939                 Quiche.memoryAddressWithPosition(buffer), buffer.remaining(), fin);
940     }
941 
942     StreamRecvResult streamRecv(long streamId, ByteBuf buffer) throws Exception {
943         QuicheQuicConnection conn = connection;
944         long connAddr = connectionAddressChecked(conn);
945         if (finBuffer == null) {
946             finBuffer = alloc().directBuffer(1);
947         }
948         if (outErrorCodeBuffer == null) {
949             outErrorCodeBuffer = alloc().directBuffer(8);
950         }
951         outErrorCodeBuffer.setLongLE(0, -1L);
952         int writerIndex = buffer.writerIndex();
953         int recvLen = Quiche.quiche_conn_stream_recv(connAddr, streamId,
954                 Quiche.writerMemoryAddress(buffer), buffer.writableBytes(), Quiche.writerMemoryAddress(finBuffer),
955                 Quiche.writerMemoryAddress(outErrorCodeBuffer));
956         long errorCode = outErrorCodeBuffer.getLongLE(0);
957         if (recvLen == Quiche.QUICHE_ERR_DONE) {
958             return StreamRecvResult.DONE;
959         } else if (recvLen < 0) {
960             throw Quiche.convertToException(recvLen, errorCode);
961         }
962         buffer.writerIndex(writerIndex + recvLen);
963         return finBuffer.getBoolean(0) ? StreamRecvResult.FIN : StreamRecvResult.OK;
964     }
965 
966     /**
967      * Receive some data on a QUIC connection.
968      */
969     void recv(InetSocketAddress sender, InetSocketAddress recipient, ByteBuf buffer) {
970         ((QuicChannelUnsafe) unsafe()).connectionRecv(sender, recipient, buffer);
971     }
972 
973     /**
974      * Return all source connection ids that are retired and so should be removed to map to the channel.
975      *
976      * @return retired ids.
977      */
978     List<ByteBuffer> retiredSourceConnectionId() {
979         QuicheQuicConnection connection = this.connection;
980         if (connection == null || connection.isFreed()) {
981             return Collections.emptyList();
982         }
983         long connAddr = connection.address();
984         assert connAddr != -1;
985         List<ByteBuffer> retiredSourceIds = null;
986         for (;;) {
987             byte[] retired = Quiche.quiche_conn_retired_scid_next(connAddr);
988             if (retired == null) {
989                 break;
990             }
991             if (retiredSourceIds == null) {
992                 retiredSourceIds = new ArrayList<>();
993             }
994             ByteBuffer retiredId = ByteBuffer.wrap(retired);
995             retiredSourceIds.add(retiredId);
996             sourceConnectionIds.remove(retiredId);
997         }
998         if (retiredSourceIds == null) {
999             return Collections.emptyList();
1000         }
1001         return retiredSourceIds;
1002     }
1003 
1004     List<ByteBuffer> newSourceConnectionIds() {
1005         if (connectionIdAddressGenerator != null && resetTokenGenerator != null) {
1006             QuicheQuicConnection connection = this.connection;
1007             if (connection == null || connection.isFreed()) {
1008                 return Collections.emptyList();
1009             }
1010             long connAddr = connection.address();
1011             // Generate all extra source ids that we can provide. This will cause frames that need to be sent. Which
1012             // is the reason why we might need to call connectionSendAndFlush().
1013             int left = Quiche.quiche_conn_scids_left(connAddr);
1014             if (left > 0) {
1015                 QuicConnectionAddress sourceAddr = connection.sourceId();
1016                 if (sourceAddr == null) {
1017                     return Collections.emptyList();
1018                 }
1019                 List<ByteBuffer> generatedIds = new ArrayList<>(left);
1020                 boolean sendAndFlush = false;
1021                 ByteBuffer key = sourceAddr.id();
1022                 ByteBuf connIdBuffer = alloc().directBuffer(key.remaining());
1023 
1024                 byte[] resetTokenArray = new byte[Quic.RESET_TOKEN_LEN];
1025                 try {
1026                     do {
1027                         ByteBuffer srcId = connectionIdAddressGenerator.newId(key.duplicate(), key.remaining())
1028                                 .asReadOnlyBuffer();
1029                         connIdBuffer.clear();
1030                         connIdBuffer.writeBytes(srcId.duplicate());
1031                         ByteBuffer resetToken = resetTokenGenerator.newResetToken(srcId.duplicate());
1032                         resetToken.get(resetTokenArray);
1033                         long result = Quiche.quiche_conn_new_scid(
1034                                 connAddr, Quiche.memoryAddress(connIdBuffer, 0, connIdBuffer.readableBytes()),
1035                                 connIdBuffer.readableBytes(), resetTokenArray, false, -1);
1036                         if (result < 0) {
1037                             break;
1038                         }
1039                         sendAndFlush = true;
1040                         generatedIds.add(srcId.duplicate());
1041                         sourceConnectionIds.add(srcId);
1042                     } while (--left > 0);
1043                 } finally {
1044                     connIdBuffer.release();
1045                 }
1046 
1047                 if (sendAndFlush) {
1048                     connectionSendAndFlush();
1049                 }
1050                 return generatedIds;
1051             }
1052         }
1053         return Collections.emptyList();
1054     }
1055 
1056     void writable() {
1057         QuicheQuicConnection conn = connection;
1058         SendResult result = connectionSend(conn);
1059         handleWritableStreams(conn);
1060         if (connectionSend(conn) == SendResult.SOME) {
1061             result = SendResult.SOME;
1062         }
1063         if (result == SendResult.SOME) {
1064             // The writability changed so lets flush as fast as possible.
1065             forceFlushParent();
1066         }
1067         freeIfClosed();
1068     }
1069 
1070     long streamCapacity(long streamId) {
1071         QuicheQuicConnection conn = connection;
1072         if (conn.isClosed()) {
1073             return 0;
1074         }
1075         return Quiche.quiche_conn_stream_capacity(conn.address(), streamId);
1076     }
1077 
1078     private boolean handleWritableStreams(QuicheQuicConnection conn) {
1079         if (conn.isFreed()) {
1080             return false;
1081         }
1082         reantranceGuard |= IN_HANDLE_WRITABLE_STREAMS;
1083         try {
1084             long connAddr = conn.address();
1085             boolean mayNeedWrite = false;
1086 
1087             if (Quiche.quiche_conn_is_established(connAddr) ||
1088                     Quiche.quiche_conn_is_in_early_data(connAddr)) {
1089                 long writableIterator = Quiche.quiche_conn_writable(connAddr);
1090 
1091                 int totalWritable = 0;
1092                 try {
1093                     // For streams we always process all streams when at least on read was requested.
1094                     for (;;) {
1095                         int writable = Quiche.quiche_stream_iter_next(
1096                                 writableIterator, writableStreams);
1097                         for (int i = 0; i < writable; i++) {
1098                             long streamId = writableStreams[i];
1099                             QuicheQuicStreamChannel streamChannel = streams.get(streamId);
1100                             if (streamChannel != null) {
1101                                 long capacity = Quiche.quiche_conn_stream_capacity(connAddr, streamId);
1102                                 if (streamChannel.writable(capacity)) {
1103                                     mayNeedWrite = true;
1104                                 }
1105                             }
1106                         }
1107                         if (writable > 0) {
1108                             totalWritable += writable;
1109                         }
1110                         if (writable < writableStreams.length) {
1111                             // We did handle all writable streams.
1112                             break;
1113                         }
1114                     }
1115                 } finally {
1116                     Quiche.quiche_stream_iter_free(writableIterator);
1117                 }
1118                 writableStreams = growIfNeeded(writableStreams, totalWritable);
1119             }
1120             return mayNeedWrite;
1121         } finally {
1122             reantranceGuard &= ~IN_HANDLE_WRITABLE_STREAMS;
1123         }
1124     }
1125 
1126     /**
1127      * Called once we receive a channelReadComplete event. This method will take care of calling
1128      * {@link ChannelPipeline#fireChannelReadComplete()} if needed and also to handle pending flushes of
1129      * writable {@link QuicheQuicStreamChannel}s.
1130      */
1131     void recvComplete() {
1132         try {
1133             QuicheQuicConnection conn = connection;
1134             if (conn.isFreed()) {
1135                 // Ensure we flush all pending writes.
1136                 forceFlushParent();
1137                 return;
1138             }
1139             fireChannelReadCompleteIfNeeded();
1140 
1141             // If we had called recv we need to ensure we call send as well.
1142             // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.send
1143             connectionSend(conn);
1144 
1145             // We are done with the read loop, flush all pending writes now.
1146             forceFlushParent();
1147             freeIfClosed();
1148         } finally {
1149             inFireChannelReadCompleteQueue = false;
1150         }
1151     }
1152 
1153     private void fireChannelReadCompleteIfNeeded() {
1154         if (fireChannelReadCompletePending) {
1155             fireChannelReadCompletePending = false;
1156             pipeline().fireChannelReadComplete();
1157         }
1158     }
1159 
1160     private void fireExceptionEvents(QuicheQuicConnection conn, Throwable cause) {
1161         if (cause instanceof SSLHandshakeException) {
1162             notifyAboutHandshakeCompletionIfNeeded(conn, (SSLHandshakeException) cause);
1163         }
1164         pipeline().fireExceptionCaught(cause);
1165     }
1166 
1167     private boolean runTasksDirectly() {
1168         return sslTaskExecutor == null || sslTaskExecutor == ImmediateExecutor.INSTANCE ||
1169                 sslTaskExecutor == ImmediateEventExecutor.INSTANCE;
1170     }
1171 
1172     private void runAllTaskSend(QuicheQuicConnection conn, Runnable task) {
1173         sslTaskExecutor.execute(decorateTaskSend(conn, task));
1174     }
1175 
1176     private void runAll(QuicheQuicConnection conn, Runnable task) {
1177         do {
1178             task.run();
1179         } while ((task = conn.sslTask()) != null);
1180     }
1181 
1182     private Runnable decorateTaskSend(QuicheQuicConnection conn, Runnable task) {
1183         return () -> {
1184             try {
1185                 runAll(conn, task);
1186             } finally {
1187                 // Move back to the EventLoop.
1188                 eventLoop().execute(() -> {
1189                     // Call connection send to continue handshake if needed.
1190                     if (connectionSend(conn) != SendResult.NONE) {
1191                         forceFlushParent();
1192                     }
1193                     freeIfClosed();
1194                 });
1195             }
1196         };
1197     }
1198 
1199     private SendResult connectionSendSegments(QuicheQuicConnection conn,
1200                                               SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator) {
1201         if (conn.isClosed()) {
1202             return SendResult.NONE;
1203         }
1204         List<ByteBuf> bufferList = new ArrayList<>(segmentedDatagramPacketAllocator.maxNumSegments());
1205         long connAddr = conn.address();
1206         int maxDatagramSize = Quiche.quiche_conn_max_send_udp_payload_size(connAddr);
1207         SendResult sendResult = SendResult.NONE;
1208         boolean close = false;
1209         try {
1210             for (;;) {
1211                 int len = calculateSendBufferLength(connAddr, maxDatagramSize);
1212                 ByteBuf out = alloc().directBuffer(len);
1213 
1214                 ByteBuffer sendInfo = conn.nextSendInfo();
1215                 InetSocketAddress sendToAddress = this.remote;
1216 
1217                 int writerIndex = out.writerIndex();
1218                 int written = Quiche.quiche_conn_send(
1219                         connAddr, Quiche.writerMemoryAddress(out), out.writableBytes(),
1220                         Quiche.memoryAddressWithPosition(sendInfo));
1221                 if (written == 0) {
1222                     out.release();
1223                     // No need to create a new datagram packet. Just try again.
1224                     continue;
1225                 }
1226                 final boolean done;
1227                 if (written < 0) {
1228                     done = true;
1229                     if (written != Quiche.QUICHE_ERR_DONE) {
1230                         close = Quiche.shouldClose(written);
1231                         Exception e = Quiche.convertToException(written);
1232                         if (!tryFailConnectPromise(e)) {
1233                             // Only fire through the pipeline if this does not fail the connect promise.
1234                             fireExceptionEvents(conn, e);
1235                         }
1236                     }
1237                 } else {
1238                     done = false;
1239                 }
1240                 int size = bufferList.size();
1241                 if (done) {
1242                     // We are done, release the buffer and send what we did build up so far.
1243                     out.release();
1244 
1245                     switch (size) {
1246                         case 0:
1247                             // Nothing more to write.
1248                             break;
1249                         case 1:
1250                             // We can write a normal datagram packet.
1251                             parent().write(new DatagramPacket(bufferList.get(0), sendToAddress));
1252                             sendResult = SendResult.SOME;
1253                             break;
1254                         default:
1255                             int segmentSize = segmentSize(bufferList);
1256                             ByteBuf compositeBuffer = Unpooled.wrappedBuffer(bufferList.toArray(new ByteBuf[0]));
1257                             // We had more than one buffer, create a segmented packet.
1258                             parent().write(segmentedDatagramPacketAllocator.newPacket(
1259                                     compositeBuffer, segmentSize, sendToAddress));
1260                             sendResult = SendResult.SOME;
1261                             break;
1262                     }
1263                     bufferList.clear();
1264                     if (close) {
1265                         sendResult = SendResult.CLOSE;
1266                     }
1267                     return sendResult;
1268                 }
1269                 out.writerIndex(writerIndex + written);
1270 
1271                 int segmentSize = -1;
1272                 if (conn.isSendInfoChanged()) {
1273                     // Change the cached address and let the user know there was a connection migration.
1274                     remote = QuicheSendInfo.getToAddress(sendInfo);
1275                     local = QuicheSendInfo.getFromAddress(sendInfo);
1276 
1277                     if (size > 0) {
1278                         // We have something in the out list already, we need to send this now and so we set the
1279                         // segmentSize.
1280                         segmentSize = segmentSize(bufferList);
1281                     }
1282                 } else if (size > 0) {
1283                     int lastReadable = segmentSize(bufferList);
1284                     // Check if we either need to send now because the last buffer we added has a smaller size then this
1285                     // one or if we reached the maximum number of segments that we can send.
1286                     if (lastReadable != out.readableBytes() ||
1287                             size == segmentedDatagramPacketAllocator.maxNumSegments()) {
1288                         segmentSize = lastReadable;
1289                     }
1290                 }
1291 
1292                 // If the segmentSize is not -1 we know we need to send now what was in the out list.
1293                 if (segmentSize != -1) {
1294                     final boolean stop;
1295                     if (size == 1) {
1296                         // Only one buffer in the out list, there is no need to use segments.
1297                         stop = writePacket(new DatagramPacket(
1298                                 bufferList.get(0), sendToAddress), maxDatagramSize, len);
1299                     } else {
1300                         // Create a packet with segments in.
1301                         ByteBuf compositeBuffer = Unpooled.wrappedBuffer(bufferList.toArray(new ByteBuf[0]));
1302                         stop = writePacket(segmentedDatagramPacketAllocator.newPacket(
1303                                 compositeBuffer, segmentSize, sendToAddress), maxDatagramSize, len);
1304                     }
1305                     bufferList.clear();
1306                     sendResult = SendResult.SOME;
1307 
1308                     if (stop) {
1309                         // Nothing left in the window, continue later. That said we still need to also
1310                         // write the previous filled out buffer as otherwise we would either leak or need
1311                         // to drop it and so produce some loss.
1312                         if (out.isReadable()) {
1313                             parent().write(new DatagramPacket(out, sendToAddress));
1314                         } else {
1315                             out.release();
1316                         }
1317                         if (close) {
1318                             sendResult = SendResult.CLOSE;
1319                         }
1320                         return sendResult;
1321                     }
1322                 }
1323                 // Let's add a touch with the bufferList as a hint. This will help us to debug leaks if there
1324                 // are any.
1325                 out.touch(bufferList);
1326                 // store for later, so we can make use of segments.
1327                 bufferList.add(out);
1328             }
1329         } finally {
1330             // NOOP
1331         }
1332     }
1333 
1334     private static int segmentSize(List<ByteBuf> bufferList) {
1335         assert !bufferList.isEmpty();
1336         int size = bufferList.size();
1337         return bufferList.get(size - 1).readableBytes();
1338     }
1339 
1340     private SendResult connectionSendSimple(QuicheQuicConnection conn) {
1341         if (conn.isClosed()) {
1342             return SendResult.NONE;
1343         }
1344         long connAddr = conn.address();
1345         SendResult sendResult = SendResult.NONE;
1346         boolean close = false;
1347         int maxDatagramSize = Quiche.quiche_conn_max_send_udp_payload_size(connAddr);
1348         for (;;) {
1349             ByteBuffer sendInfo = conn.nextSendInfo();
1350 
1351             int len = calculateSendBufferLength(connAddr, maxDatagramSize);
1352             ByteBuf out = alloc().directBuffer(len);
1353             int writerIndex = out.writerIndex();
1354 
1355             int written = Quiche.quiche_conn_send(
1356                     connAddr, Quiche.writerMemoryAddress(out), out.writableBytes(),
1357                     Quiche.memoryAddressWithPosition(sendInfo));
1358 
1359             if (written == 0) {
1360                 // No need to create a new datagram packet. Just release and try again.
1361                 out.release();
1362                 continue;
1363             }
1364             if (written < 0) {
1365                 out.release();
1366                 if (written != Quiche.QUICHE_ERR_DONE) {
1367                     close = Quiche.shouldClose(written);
1368 
1369                     Exception e = Quiche.convertToException(written);
1370                     if (!tryFailConnectPromise(e)) {
1371                         fireExceptionEvents(conn, e);
1372                     }
1373                 }
1374                 break;
1375             }
1376             if (conn.isSendInfoChanged()) {
1377                 // Change the cached address
1378                 remote = QuicheSendInfo.getToAddress(sendInfo);
1379                 local = QuicheSendInfo.getFromAddress(sendInfo);
1380             }
1381             out.writerIndex(writerIndex + written);
1382             boolean stop = writePacket(new DatagramPacket(out, remote), maxDatagramSize, len);
1383             sendResult = SendResult.SOME;
1384             if (stop) {
1385                 // Nothing left in the window, continue later
1386                 break;
1387             }
1388         }
1389         if (close) {
1390             sendResult = SendResult.CLOSE;
1391         }
1392         return sendResult;
1393     }
1394 
1395     private boolean writePacket(DatagramPacket packet, int maxDatagramSize, int len) {
1396         ChannelFuture future = parent().write(packet);
1397         if (isSendWindowUsed(maxDatagramSize, len)) {
1398             // Nothing left in the window, continue later
1399             future.addListener(continueSendingListener);
1400             return true;
1401         }
1402         return false;
1403     }
1404 
1405     private static boolean isSendWindowUsed(int maxDatagramSize, int len) {
1406         return len < maxDatagramSize;
1407     }
1408 
1409     private static int calculateSendBufferLength(long connAddr, int maxDatagramSize) {
1410         int len = Math.min(maxDatagramSize, Quiche.quiche_conn_send_quantum(connAddr));
1411         if (len <= 0) {
1412             // If there is no room left we just return some small number to reduce the risk of packet drop
1413             // while still be able to attach the listener to the write future.
1414             // We use the value of 8 because such an allocation will be cheap to serve from the
1415             // PooledByteBufAllocator while still serve our need.
1416             return 8;
1417         }
1418         return len;
1419     }
1420 
1421     /**
1422      * Write datagrams if needed and return {@code true} if something was written and we need to call
1423      * {@link Channel#flush()} at some point.
1424      */
1425     private SendResult connectionSend(QuicheQuicConnection conn) {
1426         if (conn.isFreed()) {
1427             return SendResult.NONE;
1428         }
1429         if ((reantranceGuard & IN_CONNECTION_SEND) != 0) {
1430             // Let's notify about early data if needed.
1431             notifyEarlyDataReadyIfNeeded(conn);
1432             return SendResult.NONE;
1433         }
1434 
1435         reantranceGuard |= IN_CONNECTION_SEND;
1436         try {
1437             SendResult sendResult;
1438             SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator =
1439                     config.getSegmentedDatagramPacketAllocator();
1440             if (segmentedDatagramPacketAllocator.maxNumSegments() > 0) {
1441                 sendResult = connectionSendSegments(conn, segmentedDatagramPacketAllocator);
1442             } else {
1443                 sendResult = connectionSendSimple(conn);
1444             }
1445 
1446             // Process / schedule all tasks that were created.
1447 
1448             Runnable task = conn.sslTask();
1449             if (task != null) {
1450                 if (runTasksDirectly()) {
1451                     // Consume all tasks
1452                     do {
1453                         task.run();
1454                         // Notify about early data ready if needed.
1455                         notifyEarlyDataReadyIfNeeded(conn);
1456                     } while ((task = conn.sslTask()) != null);
1457 
1458                     // Let's try again sending after we did process all tasks.
1459                     // We schedule this on the EventLoop as otherwise we will get into trouble with re-entrance.
1460                     eventLoop().execute(new Runnable() {
1461                         @Override
1462                         public void run() {
1463                             // Call connection send to continue handshake if needed.
1464                             if (connectionSend(conn) != SendResult.NONE) {
1465                                 forceFlushParent();
1466                             }
1467                             freeIfClosed();
1468                         }
1469                     });
1470                 } else {
1471                     runAllTaskSend(conn, task);
1472                 }
1473             } else {
1474                 // Notify about early data ready if needed.
1475                 notifyEarlyDataReadyIfNeeded(conn);
1476             }
1477 
1478             // Whenever we called connection_send we should also schedule the timer if needed.
1479             timeoutHandler.scheduleTimeout();
1480             return sendResult;
1481         } finally {
1482             reantranceGuard &= ~IN_CONNECTION_SEND;
1483         }
1484     }
1485 
1486     private final class QuicChannelUnsafe extends AbstractChannel.AbstractUnsafe {
1487 
1488         void connectStream(QuicStreamType type, @Nullable ChannelHandler handler,
1489                            Promise<QuicStreamChannel> promise) {
1490             if (!promise.setUncancellable()) {
1491                 return;
1492             }
1493             long streamId = idGenerator.nextStreamId(type == QuicStreamType.BIDIRECTIONAL);
1494 
1495             try {
1496                 int res = streamSend0(connection, streamId, Unpooled.EMPTY_BUFFER, false);
1497                 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
1498                     throw Quiche.convertToException(res);
1499                 }
1500             } catch (Exception e) {
1501                 promise.setFailure(e);
1502                 return;
1503             }
1504             if (type == QuicStreamType.UNIDIRECTIONAL) {
1505                 UNI_STREAMS_LEFT_UPDATER.decrementAndGet(QuicheQuicChannel.this);
1506             } else {
1507                 BIDI_STREAMS_LEFT_UPDATER.decrementAndGet(QuicheQuicChannel.this);
1508             }
1509             QuicheQuicStreamChannel streamChannel = addNewStreamChannel(streamId);
1510             if (handler != null) {
1511                 streamChannel.pipeline().addLast(handler);
1512             }
1513             eventLoop().register(streamChannel).addListener((ChannelFuture f) -> {
1514                 if (f.isSuccess()) {
1515                     promise.setSuccess(streamChannel);
1516                 } else {
1517                     promise.setFailure(f.cause());
1518                     streams.remove(streamId);
1519                 }
1520             });
1521         }
1522 
1523         @Override
1524         public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
1525             assert eventLoop().inEventLoop();
1526             if (!channelPromise.setUncancellable()) {
1527                 return;
1528             }
1529             if (server) {
1530                 channelPromise.setFailure(new UnsupportedOperationException());
1531                 return;
1532             }
1533 
1534             if (connectPromise != null) {
1535                 channelPromise.setFailure(new ConnectionPendingException());
1536                 return;
1537             }
1538 
1539             if (remote instanceof QuicConnectionAddress) {
1540                 if (!sourceConnectionIds.isEmpty()) {
1541                     // If a key is assigned we know this channel was already connected.
1542                     channelPromise.setFailure(new AlreadyConnectedException());
1543                     return;
1544                 }
1545 
1546                 connectAddress = (QuicConnectionAddress) remote;
1547                 connectPromise = channelPromise;
1548 
1549                 // Schedule connect timeout.
1550                 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1551                 if (connectTimeoutMillis > 0) {
1552                     connectTimeoutFuture = eventLoop().schedule(() -> {
1553                         ChannelPromise connectPromise = QuicheQuicChannel.this.connectPromise;
1554                         if (connectPromise != null && !connectPromise.isDone()
1555                                 && connectPromise.tryFailure(new ConnectTimeoutException(
1556                                 "connection timed out: " + remote))) {
1557                             close(voidPromise());
1558                         }
1559                     }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1560                 }
1561 
1562                 connectPromise.addListener((ChannelFuture future) -> {
1563                     if (future.isCancelled()) {
1564                         if (connectTimeoutFuture != null) {
1565                             connectTimeoutFuture.cancel(false);
1566                         }
1567                         connectPromise = null;
1568                         close(voidPromise());
1569                     }
1570                 });
1571 
1572                 parent().connect(new QuicheQuicChannelAddress(QuicheQuicChannel.this))
1573                         .addListener(f -> {
1574                             ChannelPromise connectPromise = QuicheQuicChannel.this.connectPromise;
1575                             if (connectPromise != null && !f.isSuccess()) {
1576                                 connectPromise.tryFailure(f.cause());
1577                                 // close everything after notify about failure.
1578                                 unsafe().closeForcibly();
1579                             }
1580                         });
1581                 return;
1582             }
1583 
1584             channelPromise.setFailure(new UnsupportedOperationException());
1585         }
1586 
1587         private void fireConnectCloseEventIfNeeded(QuicheQuicConnection conn) {
1588             if (connectionCloseEvent == null && !conn.isFreed()) {
1589                 connectionCloseEvent = Quiche.quiche_conn_peer_error(conn.address());
1590                 if (connectionCloseEvent != null) {
1591                     pipeline().fireUserEventTriggered(connectionCloseEvent);
1592                 }
1593             }
1594         }
1595 
1596         void connectionRecv(InetSocketAddress sender, InetSocketAddress recipient, ByteBuf buffer) {
1597             QuicheQuicConnection conn = QuicheQuicChannel.this.connection;
1598             if (conn.isFreed()) {
1599                 return;
1600             }
1601             int bufferReadable = buffer.readableBytes();
1602             if (bufferReadable == 0) {
1603                 // Nothing to do here. Just return...
1604                 // See also https://github.com/cloudflare/quiche/issues/817
1605                 return;
1606             }
1607 
1608             reantranceGuard |= IN_RECV;
1609             boolean close = false;
1610             try {
1611                 ByteBuf tmpBuffer = null;
1612                 // We need to make a copy if the buffer is read only as recv(...) may modify the input buffer as well.
1613                 // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.recv
1614                 if (buffer.isReadOnly()) {
1615                     tmpBuffer = alloc().directBuffer(buffer.readableBytes());
1616                     tmpBuffer.writeBytes(buffer);
1617                     buffer = tmpBuffer;
1618                 }
1619                 long memoryAddress = Quiche.readerMemoryAddress(buffer);
1620 
1621                 ByteBuffer recvInfo = conn.nextRecvInfo();
1622                 QuicheRecvInfo.setRecvInfo(recvInfo, sender, recipient);
1623 
1624                 remote = sender;
1625                 local = recipient;
1626 
1627                 try {
1628                     do  {
1629                         // Call quiche_conn_recv(...) until we consumed all bytes or we did receive some error.
1630                         int res = Quiche.quiche_conn_recv(conn.address(), memoryAddress, bufferReadable,
1631                                 Quiche.memoryAddressWithPosition(recvInfo));
1632                         final boolean done;
1633                         if (res < 0) {
1634                             done = true;
1635                             if (res != Quiche.QUICHE_ERR_DONE) {
1636                                 close = Quiche.shouldClose(res);
1637                                 Exception e = Quiche.convertToException(res);
1638                                 if (tryFailConnectPromise(e)) {
1639                                     break;
1640                                 }
1641                                 fireExceptionEvents(conn, e);
1642                             }
1643                         } else {
1644                             done = false;
1645                         }
1646                         // Process / schedule all tasks that were created.
1647                         Runnable task = conn.sslTask();
1648                         if (task != null) {
1649                             if (runTasksDirectly()) {
1650                                 // Consume all tasks
1651                                 do {
1652                                     task.run();
1653                                 } while ((task = conn.sslTask()) != null);
1654                                 processReceived(conn);
1655                             } else {
1656                                 runAllTaskRecv(conn, task);
1657                             }
1658                         } else {
1659                             processReceived(conn);
1660                         }
1661 
1662                         if (done) {
1663                             break;
1664                         }
1665                         memoryAddress += res;
1666                         bufferReadable -= res;
1667                     } while (bufferReadable > 0 && !conn.isFreed());
1668                 } finally {
1669                     buffer.skipBytes((int) (memoryAddress - Quiche.readerMemoryAddress(buffer)));
1670                     if (tmpBuffer != null) {
1671                         tmpBuffer.release();
1672                     }
1673                 }
1674                 if (close) {
1675                     // Let's close now as there is no way to recover
1676                     unsafe().close(newPromise());
1677                 }
1678             } finally {
1679                 reantranceGuard &= ~IN_RECV;
1680             }
1681         }
1682 
1683         private void processReceived(QuicheQuicConnection conn) {
1684             // Handle pending channelActive if needed.
1685             if (handlePendingChannelActive(conn)) {
1686                 // Connection was closed right away.
1687                 return;
1688             }
1689 
1690             notifyAboutHandshakeCompletionIfNeeded(conn, null);
1691             fireConnectCloseEventIfNeeded(conn);
1692 
1693             if (conn.isFreed()) {
1694                 return;
1695             }
1696 
1697             long connAddr = conn.address();
1698             if (Quiche.quiche_conn_is_established(connAddr) ||
1699                     Quiche.quiche_conn_is_in_early_data(connAddr)) {
1700                 long uniLeftOld = uniStreamsLeft;
1701                 long bidiLeftOld = bidiStreamsLeft;
1702                 // Only fetch new stream info when we used all our credits
1703                 if (uniLeftOld == 0 || bidiLeftOld == 0) {
1704                     long uniLeft = Quiche.quiche_conn_peer_streams_left_uni(connAddr);
1705                     long bidiLeft = Quiche.quiche_conn_peer_streams_left_bidi(connAddr);
1706                     uniStreamsLeft = uniLeft;
1707                     bidiStreamsLeft = bidiLeft;
1708                     if (uniLeftOld != uniLeft || bidiLeftOld != bidiLeft) {
1709                         pipeline().fireUserEventTriggered(QuicStreamLimitChangedEvent.INSTANCE);
1710                     }
1711                 }
1712 
1713                 handlePathEvents(conn);
1714 
1715                 if (handleWritableStreams(conn)) {
1716                     // Some data was produced, let's flush.
1717                     flushParent();
1718                 }
1719 
1720                 datagramReadable = true;
1721                 streamReadable = true;
1722 
1723                 recvDatagram(conn);
1724                 recvStream(conn);
1725             }
1726         }
1727 
1728         private void handlePathEvents(QuicheQuicConnection conn) {
1729             long event;
1730             while (!conn.isFreed() && (event = Quiche.quiche_conn_path_event_next(conn.address())) > 0) {
1731                 try {
1732                     int type = Quiche.quiche_path_event_type(event);
1733 
1734                     if (type == Quiche.QUICHE_PATH_EVENT_NEW) {
1735                         Object[] ret = Quiche.quiche_path_event_new(event);
1736                         InetSocketAddress local = (InetSocketAddress) ret[0];
1737                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1738                         pipeline().fireUserEventTriggered(new QuicPathEvent.New(local, peer));
1739                     } else if (type == Quiche.QUICHE_PATH_EVENT_VALIDATED) {
1740                         Object[] ret = Quiche.quiche_path_event_validated(event);
1741                         InetSocketAddress local = (InetSocketAddress) ret[0];
1742                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1743                         pipeline().fireUserEventTriggered(new QuicPathEvent.Validated(local, peer));
1744                     } else if (type == Quiche.QUICHE_PATH_EVENT_FAILED_VALIDATION) {
1745                         Object[] ret = Quiche.quiche_path_event_failed_validation(event);
1746                         InetSocketAddress local = (InetSocketAddress) ret[0];
1747                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1748                         pipeline().fireUserEventTriggered(new QuicPathEvent.FailedValidation(local, peer));
1749                     } else if (type == Quiche.QUICHE_PATH_EVENT_CLOSED) {
1750                         Object[] ret = Quiche.quiche_path_event_closed(event);
1751                         InetSocketAddress local = (InetSocketAddress) ret[0];
1752                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1753                         pipeline().fireUserEventTriggered(new QuicPathEvent.Closed(local, peer));
1754                     } else if (type == Quiche.QUICHE_PATH_EVENT_REUSED_SOURCE_CONNECTION_ID) {
1755                         Object[] ret = Quiche.quiche_path_event_reused_source_connection_id(event);
1756                         Long seq = (Long) ret[0];
1757                         InetSocketAddress localOld = (InetSocketAddress) ret[1];
1758                         InetSocketAddress peerOld = (InetSocketAddress) ret[2];
1759                         InetSocketAddress local = (InetSocketAddress) ret[3];
1760                         InetSocketAddress peer = (InetSocketAddress) ret[4];
1761                         pipeline().fireUserEventTriggered(
1762                                 new QuicPathEvent.ReusedSourceConnectionId(seq, localOld, peerOld, local, peer));
1763                     } else if (type == Quiche.QUICHE_PATH_EVENT_PEER_MIGRATED) {
1764                         Object[] ret = Quiche.quiche_path_event_peer_migrated(event);
1765                         InetSocketAddress local = (InetSocketAddress) ret[0];
1766                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1767                         pipeline().fireUserEventTriggered(new QuicPathEvent.PeerMigrated(local, peer));
1768                     }
1769                 } finally {
1770                     Quiche.quiche_path_event_free(event);
1771                 }
1772             }
1773         }
1774 
1775         private void runAllTaskRecv(QuicheQuicConnection conn, Runnable task) {
1776             sslTaskExecutor.execute(decorateTaskRecv(conn, task));
1777         }
1778 
1779         private Runnable decorateTaskRecv(QuicheQuicConnection conn, Runnable task) {
1780             return () -> {
1781                 try {
1782                     runAll(conn, task);
1783                 } finally {
1784                     // Move back to the EventLoop.
1785                     eventLoop().execute(() -> {
1786                         if (!conn.isFreed()) {
1787                             processReceived(conn);
1788 
1789                             // Call connection send to continue handshake if needed.
1790                             if (connectionSend(conn) != SendResult.NONE) {
1791                                 forceFlushParent();
1792                             }
1793 
1794                             freeIfClosed();
1795                         }
1796                     });
1797                 }
1798             };
1799         }
1800         void recv() {
1801             QuicheQuicConnection conn = connection;
1802             if ((reantranceGuard & IN_RECV) != 0 || conn.isFreed()) {
1803                 return;
1804             }
1805 
1806             long connAddr = conn.address();
1807             // Check if we can read anything yet.
1808             if (!Quiche.quiche_conn_is_established(connAddr) &&
1809                     !Quiche.quiche_conn_is_in_early_data(connAddr)) {
1810                 return;
1811             }
1812 
1813             reantranceGuard |= IN_RECV;
1814             try {
1815                 recvDatagram(conn);
1816                 recvStream(conn);
1817             } finally {
1818                 fireChannelReadCompleteIfNeeded();
1819                 reantranceGuard &= ~IN_RECV;
1820             }
1821         }
1822 
1823         private void recvStream(QuicheQuicConnection conn) {
1824             if (conn.isFreed()) {
1825                 return;
1826             }
1827             long connAddr = conn.address();
1828             long readableIterator = Quiche.quiche_conn_readable(connAddr);
1829             int totalReadable = 0;
1830             if (readableIterator != -1) {
1831                 try {
1832                     // For streams we always process all streams when at least on read was requested.
1833                     if (recvStreamPending && streamReadable) {
1834                         for (;;) {
1835                             int readable = Quiche.quiche_stream_iter_next(
1836                                     readableIterator, readableStreams);
1837                             for (int i = 0; i < readable; i++) {
1838                                 long streamId = readableStreams[i];
1839                                 QuicheQuicStreamChannel streamChannel = streams.get(streamId);
1840                                 if (streamChannel == null) {
1841                                     recvStreamPending = false;
1842                                     fireChannelReadCompletePending = true;
1843                                     streamChannel = addNewStreamChannel(streamId);
1844                                     streamChannel.readable();
1845                                     pipeline().fireChannelRead(streamChannel);
1846                                 } else {
1847                                     streamChannel.readable();
1848                                 }
1849                             }
1850                             if (readable < readableStreams.length) {
1851                                 // We did consume all readable streams.
1852                                 streamReadable = false;
1853                                 break;
1854                             }
1855                             if (readable > 0) {
1856                                 totalReadable += readable;
1857                             }
1858                         }
1859                     }
1860                 } finally {
1861                     Quiche.quiche_stream_iter_free(readableIterator);
1862                 }
1863                 readableStreams = growIfNeeded(readableStreams, totalReadable);
1864             }
1865         }
1866 
1867         private void recvDatagram(QuicheQuicConnection conn) {
1868             if (!supportsDatagram) {
1869                 return;
1870             }
1871             while (recvDatagramPending && datagramReadable && !conn.isFreed()) {
1872                 @SuppressWarnings("deprecation")
1873                 RecvByteBufAllocator.Handle recvHandle = recvBufAllocHandle();
1874                 recvHandle.reset(config());
1875 
1876                 int numMessagesRead = 0;
1877                 do {
1878                     long connAddr = conn.address();
1879                     int len = Quiche.quiche_conn_dgram_recv_front_len(connAddr);
1880                     if (len == Quiche.QUICHE_ERR_DONE) {
1881                         datagramReadable = false;
1882                         return;
1883                     }
1884 
1885                     ByteBuf datagramBuffer = alloc().directBuffer(len);
1886                     recvHandle.attemptedBytesRead(datagramBuffer.writableBytes());
1887                     int writerIndex = datagramBuffer.writerIndex();
1888                     long memoryAddress = Quiche.writerMemoryAddress(datagramBuffer);
1889 
1890                     int written = Quiche.quiche_conn_dgram_recv(connAddr,
1891                             memoryAddress, datagramBuffer.writableBytes());
1892                     if (written < 0) {
1893                         datagramBuffer.release();
1894                         if (written == Quiche.QUICHE_ERR_DONE) {
1895                             // We did consume all datagram packets.
1896                             datagramReadable = false;
1897                             break;
1898                         }
1899                         pipeline().fireExceptionCaught(Quiche.convertToException(written));
1900                     }
1901                     recvHandle.lastBytesRead(written);
1902                     recvHandle.incMessagesRead(1);
1903                     numMessagesRead++;
1904                     datagramBuffer.writerIndex(writerIndex + written);
1905                     recvDatagramPending = false;
1906                     fireChannelReadCompletePending = true;
1907 
1908                     pipeline().fireChannelRead(datagramBuffer);
1909                 } while (recvHandle.continueReading() && !conn.isFreed());
1910                 recvHandle.readComplete();
1911 
1912                 // Check if we produced any messages.
1913                 if (numMessagesRead > 0) {
1914                     fireChannelReadCompleteIfNeeded();
1915                 }
1916             }
1917         }
1918 
1919         private boolean handlePendingChannelActive(QuicheQuicConnection conn) {
1920             if (conn.isFreed() || state == ChannelState.CLOSED) {
1921                 return true;
1922             }
1923             if (server) {
1924                 if (state == ChannelState.OPEN && Quiche.quiche_conn_is_established(conn.address())) {
1925                     // We didn't notify before about channelActive... Update state and fire the event.
1926                     state = ChannelState.ACTIVE;
1927 
1928                     pipeline().fireChannelActive();
1929                     notifyAboutHandshakeCompletionIfNeeded(conn, null);
1930                     fireDatagramExtensionEvent(conn);
1931                 }
1932             } else if (connectPromise != null && Quiche.quiche_conn_is_established(conn.address())) {
1933                 ChannelPromise promise = connectPromise;
1934                 connectPromise = null;
1935                 state = ChannelState.ACTIVE;
1936 
1937                 boolean promiseSet = promise.trySuccess();
1938                 pipeline().fireChannelActive();
1939                 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1940                 fireDatagramExtensionEvent(conn);
1941                 if (!promiseSet) {
1942                     fireConnectCloseEventIfNeeded(conn);
1943                     this.close(this.voidPromise());
1944                     return true;
1945                 }
1946             }
1947             return false;
1948         }
1949 
1950         private void fireDatagramExtensionEvent(QuicheQuicConnection conn) {
1951             if (conn.isClosed()) {
1952                 return;
1953             }
1954             long connAddr = conn.address();
1955             int len = Quiche.quiche_conn_dgram_max_writable_len(connAddr);
1956             // QUICHE_ERR_DONE means the remote peer does not support the extension.
1957             if (len != Quiche.QUICHE_ERR_DONE) {
1958                 pipeline().fireUserEventTriggered(new QuicDatagramExtensionEvent(len));
1959             }
1960         }
1961 
1962         private QuicheQuicStreamChannel addNewStreamChannel(long streamId) {
1963             QuicheQuicStreamChannel streamChannel = new QuicheQuicStreamChannel(
1964                     QuicheQuicChannel.this, streamId);
1965             QuicheQuicStreamChannel old = streams.put(streamId, streamChannel);
1966             assert old == null;
1967             streamChannel.writable(streamCapacity(streamId));
1968             return streamChannel;
1969         }
1970     }
1971 
1972     /**
1973      * Finish the connect operation of a client channel.
1974      */
1975     void finishConnect() {
1976         assert !server;
1977         assert connection != null;
1978         if (connectionSend(connection) != SendResult.NONE) {
1979             flushParent();
1980         }
1981     }
1982 
1983     private void notifyEarlyDataReadyIfNeeded(QuicheQuicConnection conn) {
1984         if (!server && !earlyDataReadyNotified &&
1985                 !conn.isFreed() && Quiche.quiche_conn_is_in_early_data(conn.address())) {
1986             earlyDataReadyNotified = true;
1987             pipeline().fireUserEventTriggered(SslEarlyDataReadyEvent.INSTANCE);
1988         }
1989     }
1990 
1991     private final class TimeoutHandler implements Runnable {
1992         private ScheduledFuture<?> timeoutFuture;
1993 
1994         @Override
1995         public void run() {
1996             QuicheQuicConnection conn = connection;
1997             if (conn.isFreed()) {
1998                 return;
1999             }
2000             if (!freeIfClosed()) {
2001                 long connAddr = conn.address();
2002                 timeoutFuture = null;
2003                 // Notify quiche there was a timeout.
2004                 Quiche.quiche_conn_on_timeout(connAddr);
2005                 if (!freeIfClosed()) {
2006                     // We need to call connectionSend when a timeout was triggered.
2007                     // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.send.
2008                     if (connectionSend(conn) != SendResult.NONE) {
2009                         flushParent();
2010                     }
2011                     boolean closed = freeIfClosed();
2012                     if (!closed) {
2013                         // The connection is alive, reschedule.
2014                         scheduleTimeout();
2015                     }
2016                 }
2017             }
2018         }
2019 
2020         // Schedule timeout.
2021         // See https://docs.rs/quiche/0.6.0/quiche/#generating-outgoing-packets
2022         void scheduleTimeout() {
2023             QuicheQuicConnection conn = connection;
2024             if (conn.isFreed()) {
2025                 cancel();
2026                 return;
2027             }
2028             if (conn.isClosed()) {
2029                 cancel();
2030                 unsafe().close(newPromise());
2031                 return;
2032             }
2033             long nanos = Quiche.quiche_conn_timeout_as_nanos(conn.address());
2034             if (nanos < 0 || nanos == Long.MAX_VALUE) {
2035                 // No timeout needed.
2036                 cancel();
2037                 return;
2038             }
2039             if (timeoutFuture == null) {
2040                 timeoutFuture = eventLoop().schedule(this,
2041                         nanos, TimeUnit.NANOSECONDS);
2042             } else {
2043                 long remaining = timeoutFuture.getDelay(TimeUnit.NANOSECONDS);
2044                 if (remaining <= 0) {
2045                     // This means the timer already elapsed. In this case just cancel the future and call run()
2046                     // directly. This will ensure we correctly call quiche_conn_on_timeout() etc.
2047                     cancel();
2048                     run();
2049                 } else if (remaining > nanos) {
2050                     // The new timeout is smaller then what was scheduled before. Let's cancel the old timeout
2051                     // and schedule a new one.
2052                     cancel();
2053                     timeoutFuture = eventLoop().schedule(this, nanos, TimeUnit.NANOSECONDS);
2054                 }
2055             }
2056         }
2057 
2058         void cancel() {
2059             if (timeoutFuture != null) {
2060                 timeoutFuture.cancel(false);
2061                 timeoutFuture = null;
2062             }
2063         }
2064     }
2065 
2066     @Override
2067     public Future<QuicConnectionStats> collectStats(Promise<QuicConnectionStats> promise) {
2068         if (eventLoop().inEventLoop()) {
2069             collectStats0(promise);
2070         } else {
2071             eventLoop().execute(() -> collectStats0(promise));
2072         }
2073         return promise;
2074     }
2075 
2076     private void collectStats0(Promise<QuicConnectionStats> promise) {
2077         QuicheQuicConnection conn = connection;
2078         if (conn.isFreed()) {
2079             promise.setSuccess(statsAtClose);
2080             return;
2081         }
2082 
2083         collectStats0(connection, promise);
2084     }
2085 
2086     @Nullable
2087     private QuicConnectionStats collectStats0(QuicheQuicConnection connection, Promise<QuicConnectionStats> promise) {
2088         final long[] stats = Quiche.quiche_conn_stats(connection.address());
2089         if (stats == null) {
2090             promise.setFailure(new IllegalStateException("native quiche_conn_stats(...) failed"));
2091             return null;
2092         }
2093 
2094         final QuicheQuicConnectionStats connStats =
2095             new QuicheQuicConnectionStats(stats);
2096         promise.setSuccess(connStats);
2097         return connStats;
2098     }
2099 
2100     @Override
2101     public Future<QuicConnectionPathStats> collectPathStats(int pathIdx, Promise<QuicConnectionPathStats> promise) {
2102         if (eventLoop().inEventLoop()) {
2103             collectPathStats0(pathIdx, promise);
2104         } else {
2105             eventLoop().execute(() -> collectPathStats0(pathIdx, promise));
2106         }
2107         return promise;
2108     }
2109 
2110     private void collectPathStats0(int pathIdx, Promise<QuicConnectionPathStats> promise) {
2111         QuicheQuicConnection conn = connection;
2112         if (conn.isFreed()) {
2113             promise.setFailure(new IllegalStateException("Connection is closed"));
2114             return;
2115         }
2116 
2117         final Object[] stats = Quiche.quiche_conn_path_stats(connection.address(), pathIdx);
2118         if (stats == null) {
2119             promise.setFailure(new IllegalStateException("native quiche_conn_path_stats(...) failed"));
2120             return;
2121         }
2122         promise.setSuccess(new QuicheQuicConnectionPathStats(stats));
2123     }
2124 
2125     @Override
2126     public QuicTransportParameters peerTransportParameters() {
2127         return connection.peerParameters();
2128     }
2129 }