View Javadoc
1   /*
2    * Copyright 2020 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.quic;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.AbstractChannel;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandler;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelMetadata;
27  import io.netty.channel.ChannelOption;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPipeline;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.DefaultChannelPipeline;
33  import io.netty.channel.EventLoop;
34  import io.netty.channel.RecvByteBufAllocator;
35  import io.netty.channel.socket.DatagramPacket;
36  import io.netty.handler.ssl.SniCompletionEvent;
37  import io.netty.handler.ssl.SslHandshakeCompletionEvent;
38  import io.netty.util.AttributeKey;
39  import io.netty.util.collection.LongObjectHashMap;
40  import io.netty.util.collection.LongObjectMap;
41  import io.netty.util.concurrent.Future;
42  import io.netty.util.concurrent.ImmediateEventExecutor;
43  import io.netty.util.concurrent.ImmediateExecutor;
44  import io.netty.util.concurrent.Promise;
45  import io.netty.util.internal.StringUtil;
46  import io.netty.util.internal.logging.InternalLogger;
47  import io.netty.util.internal.logging.InternalLoggerFactory;
48  import org.jetbrains.annotations.Nullable;
49  
50  import javax.net.ssl.SSLEngine;
51  import javax.net.ssl.SSLHandshakeException;
52  import java.io.File;
53  import java.net.ConnectException;
54  import java.net.InetSocketAddress;
55  import java.net.SocketAddress;
56  import java.nio.BufferUnderflowException;
57  import java.nio.ByteBuffer;
58  import java.nio.channels.AlreadyConnectedException;
59  import java.nio.channels.ClosedChannelException;
60  import java.nio.channels.ConnectionPendingException;
61  import java.util.ArrayList;
62  import java.util.Collections;
63  import java.util.HashSet;
64  import java.util.List;
65  import java.util.Map;
66  import java.util.Set;
67  import java.util.concurrent.Executor;
68  import java.util.concurrent.ScheduledFuture;
69  import java.util.concurrent.TimeUnit;
70  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
71  import java.util.function.Consumer;
72  import java.util.function.Function;
73  
74  /**
75   * {@link QuicChannel} implementation that uses <a href="https://github.com/cloudflare/quiche">quiche</a>.
76   */
77  final class QuicheQuicChannel extends AbstractChannel implements QuicChannel {
78      private static final InternalLogger logger = InternalLoggerFactory.getInstance(QuicheQuicChannel.class);
79      private static final String QLOG_FILE_EXTENSION = ".qlog";
80  
81      enum StreamRecvResult {
82          /**
83           * Nothing more to read from the stream.
84           */
85          DONE,
86          /**
87           * FIN flag received.
88           */
89          FIN,
90          /**
91           * Normal read without FIN flag.
92           */
93          OK
94      }
95  
96      private enum ChannelState {
97          OPEN,
98          ACTIVE,
99          CLOSED
100     }
101 
102     private enum SendResult {
103         SOME,
104         NONE,
105         CLOSE
106     }
107 
108     private static final class CloseData implements ChannelFutureListener {
109         final boolean applicationClose;
110         final int err;
111         final ByteBuf reason;
112 
113         CloseData(boolean applicationClose, int err, ByteBuf reason) {
114             this.applicationClose = applicationClose;
115             this.err = err;
116             this.reason = reason;
117         }
118 
119         @Override
120         public void operationComplete(ChannelFuture future) {
121             reason.release();
122         }
123     }
124 
125     private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
126     private long[] readableStreams = new long[4];
127     private long[] writableStreams = new long[4];
128     private final LongObjectMap<QuicheQuicStreamChannel> streams = new LongObjectHashMap<>();
129     private final QuicheQuicChannelConfig config;
130     private final boolean server;
131     private final QuicStreamIdGenerator idGenerator;
132     private final ChannelHandler streamHandler;
133     private final Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray;
134     private final Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray;
135     private final TimeoutHandler timeoutHandler;
136     private final QuicConnectionIdGenerator connectionIdAddressGenerator;
137     private final QuicResetTokenGenerator resetTokenGenerator;
138     private final Set<ByteBuffer> sourceConnectionIds = new HashSet<>();
139 
140     private Consumer<QuicheQuicChannel> freeTask;
141     private Executor sslTaskExecutor;
142     private boolean inFireChannelReadCompleteQueue;
143     private boolean fireChannelReadCompletePending;
144     private ByteBuf finBuffer;
145     private ByteBuf outErrorCodeBuffer;
146     private ChannelPromise connectPromise;
147     private ScheduledFuture<?> connectTimeoutFuture;
148     private QuicConnectionAddress connectLocalAddress;
149     private QuicConnectionAddress connectRemoteAddress;
150     private CloseData closeData;
151     private QuicConnectionCloseEvent connectionCloseEvent;
152     private QuicConnectionStats statsAtClose;
153     private boolean supportsDatagram;
154     private boolean recvDatagramPending;
155     private boolean datagramReadable;
156     private boolean recvStreamPending;
157     private boolean streamReadable;
158     private boolean handshakeCompletionNotified;
159     private boolean earlyDataReadyNotified;
160     private int reantranceGuard;
161     private static final int IN_RECV = 1 << 1;
162     private static final int IN_CONNECTION_SEND = 1 << 2;
163     private static final int IN_HANDLE_WRITABLE_STREAMS = 1 << 3;
164     private volatile ChannelState state = ChannelState.OPEN;
165     private volatile boolean timedOut;
166     private volatile String traceId;
167     private volatile QuicheQuicConnection connection;
168     private volatile InetSocketAddress local;
169     private volatile InetSocketAddress remote;
170 
171     private final ChannelFutureListener continueSendingListener = f -> {
172         if (connectionSend(connection) != SendResult.NONE) {
173             flushParent();
174         }
175     };
176 
177     private static final AtomicLongFieldUpdater<QuicheQuicChannel> UNI_STREAMS_LEFT_UPDATER =
178             AtomicLongFieldUpdater.newUpdater(QuicheQuicChannel.class, "uniStreamsLeft");
179     private volatile long uniStreamsLeft;
180 
181     private static final AtomicLongFieldUpdater<QuicheQuicChannel> BIDI_STREAMS_LEFT_UPDATER =
182             AtomicLongFieldUpdater.newUpdater(QuicheQuicChannel.class, "bidiStreamsLeft");
183     private volatile long bidiStreamsLeft;
184 
185     private QuicheQuicChannel(Channel parent, boolean server, @Nullable ByteBuffer key, InetSocketAddress local,
186                               InetSocketAddress remote, boolean supportsDatagram, ChannelHandler streamHandler,
187                               Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
188                               Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray,
189                               @Nullable Consumer<QuicheQuicChannel> freeTask,
190                               @Nullable Executor sslTaskExecutor,
191                               @Nullable QuicConnectionIdGenerator connectionIdAddressGenerator,
192                               @Nullable QuicResetTokenGenerator resetTokenGenerator) {
193         super(parent);
194         config = new QuicheQuicChannelConfig(this);
195         this.freeTask = freeTask;
196         this.server = server;
197         this.idGenerator = new QuicStreamIdGenerator(server);
198         this.connectionIdAddressGenerator = connectionIdAddressGenerator;
199         this.resetTokenGenerator = resetTokenGenerator;
200         if (key != null) {
201             this.sourceConnectionIds.add(key);
202         }
203 
204         this.supportsDatagram = supportsDatagram;
205         this.local = local;
206         this.remote = remote;
207 
208         this.streamHandler = streamHandler;
209         this.streamOptionsArray = streamOptionsArray;
210         this.streamAttrsArray = streamAttrsArray;
211         timeoutHandler = new TimeoutHandler();
212         this.sslTaskExecutor = sslTaskExecutor == null ? ImmediateExecutor.INSTANCE : sslTaskExecutor;
213     }
214 
215     static QuicheQuicChannel forClient(Channel parent, InetSocketAddress local, InetSocketAddress remote,
216                                        ChannelHandler streamHandler,
217                                        Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
218                                        Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray) {
219         return new QuicheQuicChannel(parent, false, null, local, remote, false, streamHandler,
220                 streamOptionsArray, streamAttrsArray, null, null, null, null);
221     }
222 
223     static QuicheQuicChannel forServer(Channel parent, ByteBuffer key, InetSocketAddress local,
224                                        InetSocketAddress remote,
225                                        boolean supportsDatagram, ChannelHandler streamHandler,
226                                        Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
227                                        Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray,
228                                        Consumer<QuicheQuicChannel> freeTask, Executor sslTaskExecutor,
229                                        QuicConnectionIdGenerator connectionIdAddressGenerator,
230                                        QuicResetTokenGenerator resetTokenGenerator) {
231         return new QuicheQuicChannel(parent, true, key, local, remote, supportsDatagram,
232                 streamHandler, streamOptionsArray, streamAttrsArray, freeTask,
233                 sslTaskExecutor, connectionIdAddressGenerator, resetTokenGenerator);
234     }
235 
236     private static final int MAX_ARRAY_LEN = 128;
237 
238     private static long[] growIfNeeded(long[] array, int maxLength) {
239         if (maxLength > array.length) {
240             if (array.length == MAX_ARRAY_LEN) {
241                 return array;
242             }
243             // Increase by 4 until we reach MAX_ARRAY_LEN
244             return new long[Math.min(MAX_ARRAY_LEN, array.length + 4)];
245         }
246         return array;
247     }
248 
249     @Override
250     public boolean isTimedOut() {
251         return timedOut;
252     }
253 
254     @Override
255     public SSLEngine sslEngine() {
256         QuicheQuicConnection connection = this.connection;
257         return connection == null ? null : connection.engine();
258     }
259 
260     private void notifyAboutHandshakeCompletionIfNeeded(QuicheQuicConnection conn,
261                                                         @Nullable SSLHandshakeException cause) {
262         if (handshakeCompletionNotified) {
263             return;
264         }
265         if (cause != null) {
266             pipeline().fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
267             return;
268         }
269         if (conn.isFreed()) {
270             return;
271         }
272         switch (connection.engine().getHandshakeStatus()) {
273             case NOT_HANDSHAKING:
274             case FINISHED:
275                 handshakeCompletionNotified = true;
276                 pipeline().fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
277                 break;
278             default:
279                 break;
280         }
281     }
282 
283     @Override
284     public long peerAllowedStreams(QuicStreamType type) {
285         switch (type) {
286             case BIDIRECTIONAL:
287                 return bidiStreamsLeft;
288             case UNIDIRECTIONAL:
289                 return uniStreamsLeft;
290             default:
291                 return 0;
292         }
293     }
294 
295     void attachQuicheConnection(QuicheQuicConnection connection) {
296         this.connection = connection;
297 
298         byte[] traceId = Quiche.quiche_conn_trace_id(connection.address());
299         if (traceId != null) {
300             this.traceId = new String(traceId);
301         }
302 
303         connection.init(local, remote,
304                 sniHostname -> pipeline().fireUserEventTriggered(new SniCompletionEvent(sniHostname)));
305 
306         // Setup QLOG if needed.
307         QLogConfiguration configuration = config.getQLogConfiguration();
308         if (configuration != null) {
309             final String fileName;
310             File file = new File(configuration.path());
311             if (file.isDirectory()) {
312                 // Create directory if needed.
313                 file.mkdir();
314                 if (this.traceId != null) {
315                     fileName = configuration.path() + File.separatorChar + this.traceId + "-" +
316                             id().asShortText() + QLOG_FILE_EXTENSION;
317                 } else {
318                     fileName = configuration.path() + File.separatorChar + id().asShortText() + QLOG_FILE_EXTENSION;
319                 }
320             } else {
321                 fileName = configuration.path();
322             }
323 
324             if (!Quiche.quiche_conn_set_qlog_path(connection.address(), fileName,
325                     configuration.logTitle(), configuration.logDescription())) {
326                 logger.info("Unable to create qlog file: {} ", fileName);
327             }
328         }
329     }
330 
331     void connectNow(Function<QuicChannel, ? extends QuicSslEngine> engineProvider, Executor sslTaskExecutor,
332                     Consumer<QuicheQuicChannel> freeTask, long configAddr, int localConnIdLength,
333                     boolean supportsDatagram, ByteBuffer fromSockaddrMemory, ByteBuffer toSockaddrMemory)
334             throws Exception {
335         assert this.connection == null;
336         assert this.traceId == null;
337         assert this.sourceConnectionIds.isEmpty();
338 
339         this.sslTaskExecutor = sslTaskExecutor;
340         this.freeTask = freeTask;
341 
342         QuicConnectionAddress connectLocalAddress = this.connectLocalAddress;
343 
344         if (connectLocalAddress == QuicConnectionAddress.EPHEMERAL) {
345             connectLocalAddress = QuicConnectionAddress.random(localConnIdLength);
346         }
347         ByteBuffer localConnectId = connectLocalAddress.id();
348         if (localConnectId.remaining() != localConnIdLength) {
349             failConnectPromiseAndThrow(new IllegalArgumentException("connectionAddress has length "
350                     + localConnectId.remaining()
351                     + " instead of " + localConnIdLength));
352         }
353         QuicConnectionAddress connectRemoteAddress = this.connectRemoteAddress;
354 
355         QuicSslEngine engine = engineProvider.apply(this);
356         if (!(engine instanceof QuicheQuicSslEngine)) {
357             failConnectPromiseAndThrow(new IllegalArgumentException("QuicSslEngine is not of type "
358                     + QuicheQuicSslEngine.class.getSimpleName()));
359             return;
360         }
361         if (!engine.getUseClientMode()) {
362             failConnectPromiseAndThrow(new IllegalArgumentException("QuicSslEngine is not create in client mode"));
363         }
364         QuicheQuicSslEngine quicheEngine = (QuicheQuicSslEngine) engine;
365         ByteBuf localIdBuffer = alloc().directBuffer(localConnectId.remaining()).writeBytes(localConnectId.duplicate());
366         ByteBuf remoteIdBuffer;
367         if (connectRemoteAddress != QuicConnectionAddress.EPHEMERAL) {
368             ByteBuffer remoteId = connectRemoteAddress.id();
369             remoteIdBuffer = alloc().directBuffer(remoteId.remaining()).writeBytes(remoteId.duplicate());
370         } else {
371             remoteIdBuffer = null;
372         }
373         try {
374             int fromSockaddrLen = SockaddrIn.setAddress(fromSockaddrMemory, local);
375             int toSockaddrLen = SockaddrIn.setAddress(toSockaddrMemory, remote);
376             QuicheQuicConnection connection = quicheEngine.createConnection(ssl -> {
377                 long localIdMemoryAddress = Quiche.readerMemoryAddress(localIdBuffer);
378                 int localIdLength = localIdBuffer.readableBytes();
379 
380                 final long remoteIdMemoryAddress;
381                 final int remoteIdLength;
382                 if (remoteIdBuffer == null) {
383                     return Quiche.quiche_conn_new_with_tls(localIdMemoryAddress, localIdLength,
384                             -1, -1,
385                             Quiche.memoryAddressWithPosition(fromSockaddrMemory), fromSockaddrLen,
386                             Quiche.memoryAddressWithPosition(toSockaddrMemory), toSockaddrLen,
387                             configAddr, ssl, false);
388                 } else {
389                     return Quiche.quiche_conn_new_with_tls_and_client_dcid(localIdMemoryAddress, localIdLength,
390                             Quiche.readerMemoryAddress(remoteIdBuffer), remoteIdBuffer.readableBytes(),
391                             Quiche.memoryAddressWithPosition(fromSockaddrMemory), fromSockaddrLen,
392                             Quiche.memoryAddressWithPosition(toSockaddrMemory), toSockaddrLen,
393                             configAddr, ssl);
394                 }
395             });
396             if (connection == null) {
397                 failConnectPromiseAndThrow(new ConnectException());
398                 return;
399             }
400             attachQuicheConnection(connection);
401             QuicClientSessionCache sessionCache = quicheEngine.ctx.getSessionCache();
402             if (sessionCache != null) {
403                 byte[] sessionBytes = sessionCache
404                         .getSession(quicheEngine.getSession().getPeerHost(), quicheEngine.getSession().getPeerPort());
405                 if (sessionBytes != null) {
406                     Quiche.quiche_conn_set_session(connection.address(), sessionBytes);
407                 }
408             }
409             this.supportsDatagram = supportsDatagram;
410             sourceConnectionIds.add(localConnectId);
411         } finally {
412             localIdBuffer.release();
413             if (remoteIdBuffer != null) {
414                 remoteIdBuffer.release();
415             }
416         }
417     }
418 
419     private void failConnectPromiseAndThrow(Exception e) throws Exception {
420         tryFailConnectPromise(e);
421         throw e;
422     }
423 
424     private boolean tryFailConnectPromise(Exception e) {
425         ChannelPromise promise = connectPromise;
426         if (promise != null) {
427             connectPromise = null;
428             promise.tryFailure(e);
429             return true;
430         }
431         return false;
432     }
433 
434     Set<ByteBuffer> sourceConnectionIds() {
435         return sourceConnectionIds;
436     }
437 
438     boolean markInFireChannelReadCompleteQueue() {
439         if (inFireChannelReadCompleteQueue) {
440             return false;
441         }
442         inFireChannelReadCompleteQueue = true;
443         return true;
444     }
445 
446     private void failPendingConnectPromise() {
447         ChannelPromise promise = QuicheQuicChannel.this.connectPromise;
448         if (promise != null) {
449             QuicheQuicChannel.this.connectPromise = null;
450             promise.tryFailure(new QuicClosedChannelException(this.connectionCloseEvent));
451         }
452     }
453 
454     void forceClose() {
455         unsafe().close(voidPromise());
456     }
457 
458     @Override
459     protected DefaultChannelPipeline newChannelPipeline() {
460         return new DefaultChannelPipeline(this) {
461             @Override
462             protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
463                 if (msg instanceof QuicStreamChannel) {
464                     QuicStreamChannel channel = (QuicStreamChannel) msg;
465                     Quic.setupChannel(channel, streamOptionsArray, streamAttrsArray, streamHandler, logger);
466                     ctx.channel().eventLoop().register(channel);
467                 } else {
468                     super.onUnhandledInboundMessage(ctx, msg);
469                 }
470             }
471         };
472     }
473 
474     @Override
475     public QuicChannel flush() {
476         super.flush();
477         return this;
478     }
479 
480     @Override
481     public QuicChannel read() {
482         super.read();
483         return this;
484     }
485 
486     @Override
487     public Future<QuicStreamChannel> createStream(QuicStreamType type, @Nullable ChannelHandler handler,
488                                                   Promise<QuicStreamChannel> promise) {
489         if (eventLoop().inEventLoop()) {
490             ((QuicChannelUnsafe) unsafe()).connectStream(type, handler, promise);
491         } else {
492             eventLoop().execute(() -> ((QuicChannelUnsafe) unsafe()).connectStream(type, handler, promise));
493         }
494         return promise;
495     }
496 
497     @Override
498     public ChannelFuture close(boolean applicationClose, int error, ByteBuf reason, ChannelPromise promise) {
499         if (eventLoop().inEventLoop()) {
500             close0(applicationClose, error, reason, promise);
501         } else {
502             eventLoop().execute(() -> close0(applicationClose, error, reason, promise));
503         }
504         return promise;
505     }
506 
507     private void close0(boolean applicationClose, int error, ByteBuf reason, ChannelPromise promise) {
508         if (closeData == null) {
509             if (!reason.hasMemoryAddress()) {
510                 // Copy to direct buffer as that's what we need.
511                 ByteBuf copy = alloc().directBuffer(reason.readableBytes()).writeBytes(reason);
512                 reason.release();
513                 reason = copy;
514             }
515             closeData = new CloseData(applicationClose, error, reason);
516             promise.addListener(closeData);
517         } else {
518             // We already have a close scheduled that uses a close data. Lets release the buffer early.
519             reason.release();
520         }
521         close(promise);
522     }
523 
524     @Override
525     public String toString() {
526         String traceId = this.traceId;
527         if (traceId == null) {
528             return "()" + super.toString();
529         } else {
530             return '(' + traceId + ')' + super.toString();
531         }
532     }
533 
534     @Override
535     protected AbstractUnsafe newUnsafe() {
536         return new QuicChannelUnsafe();
537     }
538 
539     @Override
540     protected boolean isCompatible(EventLoop eventLoop) {
541         return parent().eventLoop() == eventLoop;
542     }
543 
544     @Override
545     @Nullable
546     protected QuicConnectionAddress localAddress0() {
547         QuicheQuicConnection connection = this.connection;
548         return connection == null ? null : connection.sourceId();
549     }
550 
551     @Override
552     @Nullable
553     protected QuicConnectionAddress remoteAddress0() {
554         QuicheQuicConnection connection = this.connection;
555         return connection == null ? null : connection.destinationId();
556     }
557 
558     @Override
559     @Nullable
560     public QuicConnectionAddress localAddress() {
561         // Override so we never cache as the sourceId() can change over life-time.
562         return localAddress0();
563     }
564 
565     @Override
566     @Nullable
567     public QuicConnectionAddress remoteAddress() {
568         // Override so we never cache as the destinationId() can change over life-time.
569         return remoteAddress0();
570     }
571 
572     @Override
573     @Nullable
574     public SocketAddress localSocketAddress() {
575         return local;
576     }
577 
578     @Override
579     @Nullable
580     public SocketAddress remoteSocketAddress() {
581         return remote;
582     }
583 
584     @Override
585     protected void doBind(SocketAddress socketAddress) {
586         throw new UnsupportedOperationException();
587     }
588 
589     @Override
590     protected void doDisconnect() throws Exception {
591         doClose();
592     }
593 
594     @Override
595     protected void doClose() throws Exception {
596         if (state == ChannelState.CLOSED) {
597             return;
598         }
599         state = ChannelState.CLOSED;
600 
601         QuicheQuicConnection conn = this.connection;
602         if (conn == null || conn.isFreed()) {
603             if (closeData != null) {
604                 closeData.reason.release();
605                 closeData = null;
606             }
607             failPendingConnectPromise();
608             return;
609         }
610 
611         // Call connectionSend() so we ensure we send all that is queued before we close the channel
612         SendResult sendResult = connectionSend(conn);
613 
614         final boolean app;
615         final int err;
616         final ByteBuf reason;
617         if (closeData == null) {
618             app = false;
619             err = 0;
620             reason = Unpooled.EMPTY_BUFFER;
621         } else {
622             app = closeData.applicationClose;
623             err = closeData.err;
624             reason = closeData.reason;
625             closeData = null;
626         }
627 
628         failPendingConnectPromise();
629         try {
630             int res = Quiche.quiche_conn_close(conn.address(), app, err,
631                     Quiche.readerMemoryAddress(reason), reason.readableBytes());
632             if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
633                 throw Quiche.convertToException(res);
634             }
635             // As we called quiche_conn_close(...) we need to ensure we will call quiche_conn_send(...) either
636             // now or we will do so once we see the channelReadComplete event.
637             //
638             // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.close
639             if (connectionSend(conn) == SendResult.SOME) {
640                 sendResult = SendResult.SOME;
641             }
642         } finally {
643 
644             // making sure that connection statistics is available
645             // even after channel is closed
646             statsAtClose = collectStats0(conn, eventLoop().newPromise());
647             try {
648                 timedOut = Quiche.quiche_conn_is_timed_out(conn.address());
649 
650                 closeStreams();
651                 if (finBuffer != null) {
652                     finBuffer.release();
653                     finBuffer = null;
654                 }
655                 if (outErrorCodeBuffer != null) {
656                     outErrorCodeBuffer.release();
657                     outErrorCodeBuffer = null;
658                 }
659             } finally {
660                 if (sendResult == SendResult.SOME) {
661                     // As this is the close let us flush it asap.
662                     forceFlushParent();
663                 } else {
664                     flushParent();
665                 }
666                 conn.free();
667                 if (freeTask != null) {
668                     freeTask.accept(this);
669                 }
670                 timeoutHandler.cancel();
671 
672                 local = null;
673                 remote = null;
674             }
675         }
676     }
677 
678     @Override
679     protected void doBeginRead() {
680         recvDatagramPending = true;
681         recvStreamPending = true;
682         if (datagramReadable || streamReadable) {
683             ((QuicChannelUnsafe) unsafe()).recv();
684         }
685     }
686 
687     @Override
688     protected Object filterOutboundMessage(Object msg) {
689         if (msg instanceof ByteBuf) {
690             return msg;
691         }
692         throw new UnsupportedOperationException("Unsupported message type: " + StringUtil.simpleClassName(msg));
693     }
694 
695     @Override
696     protected void doWrite(ChannelOutboundBuffer channelOutboundBuffer) throws Exception {
697         if (!supportsDatagram) {
698             throw new UnsupportedOperationException("Datagram extension is not supported");
699         }
700         boolean sendSomething = false;
701         boolean retry = false;
702         QuicheQuicConnection conn = connection;
703         try {
704             for (;;) {
705                 ByteBuf buffer = (ByteBuf) channelOutboundBuffer.current();
706                 if (buffer == null) {
707                     break;
708                 }
709 
710                 int readable = buffer.readableBytes();
711                 if (readable == 0) {
712                     // Skip empty buffers.
713                     channelOutboundBuffer.remove();
714                     continue;
715                 }
716 
717                 final int res;
718                 if (!buffer.isDirect() || buffer.nioBufferCount() > 1) {
719                     ByteBuf tmpBuffer = alloc().directBuffer(readable);
720                     try {
721                         tmpBuffer.writeBytes(buffer, buffer.readerIndex(), readable);
722                         res = sendDatagram(conn, tmpBuffer);
723                     } finally {
724                         tmpBuffer.release();
725                     }
726                 } else {
727                     res = sendDatagram(conn, buffer);
728                 }
729                 if (res >= 0) {
730                     channelOutboundBuffer.remove();
731                     sendSomething = true;
732                     retry = false;
733                 } else {
734                     if (res == Quiche.QUICHE_ERR_BUFFER_TOO_SHORT) {
735                         retry = false;
736                         channelOutboundBuffer.remove(new BufferUnderflowException());
737                     } else if (res == Quiche.QUICHE_ERR_INVALID_STATE) {
738                         throw new UnsupportedOperationException("Remote peer does not support Datagram extension");
739                     } else if (res == Quiche.QUICHE_ERR_DONE) {
740                         if (retry) {
741                             // We already retried and it didn't work. Let's drop the datagrams on the floor.
742                             for (;;) {
743                                 if (!channelOutboundBuffer.remove()) {
744                                     // The buffer is empty now.
745                                     return;
746                                 }
747                             }
748                         }
749                         // Set sendSomething to false a we will call connectionSend() now.
750                         sendSomething = false;
751                         // If this returned DONE we couldn't write anymore. This happens if the internal queue
752                         // is full. In this case we should call quiche_conn_send(...) and so make space again.
753                         if (connectionSend(conn) != SendResult.NONE) {
754                             forceFlushParent();
755                         }
756                         // Let's try again to write the message.
757                         retry = true;
758                     } else {
759                         throw Quiche.convertToException(res);
760                     }
761                 }
762             }
763         } finally {
764             if (sendSomething && connectionSend(conn) != SendResult.NONE) {
765                 flushParent();
766             }
767         }
768     }
769 
770     private static int sendDatagram(QuicheQuicConnection conn, ByteBuf buf) throws ClosedChannelException {
771         return Quiche.quiche_conn_dgram_send(connectionAddressChecked(conn),
772                 Quiche.readerMemoryAddress(buf), buf.readableBytes());
773     }
774 
775     @Override
776     public QuicChannelConfig config() {
777         return config;
778     }
779 
780     @Override
781     public boolean isOpen() {
782         return state != ChannelState.CLOSED;
783     }
784 
785     @Override
786     public boolean isActive() {
787         return state == ChannelState.ACTIVE;
788     }
789 
790     @Override
791     public ChannelMetadata metadata() {
792         return METADATA;
793     }
794 
795     /**
796      * This may call {@link #flush()} on the parent channel if needed. The flush may be delayed until the read loop
797      * is over.
798      */
799     private void flushParent() {
800         if (!inFireChannelReadCompleteQueue) {
801             forceFlushParent();
802         }
803     }
804 
805     /**
806      * Call {@link #flush()} on the parent channel.
807      */
808     private void forceFlushParent() {
809         parent().flush();
810     }
811 
812     private static long connectionAddressChecked(@Nullable QuicheQuicConnection conn) throws ClosedChannelException {
813         if (conn == null || conn.isFreed()) {
814             throw new ClosedChannelException();
815         }
816         return conn.address();
817     }
818 
819     boolean freeIfClosed() {
820         QuicheQuicConnection conn = connection;
821         if (conn == null || conn.isFreed()) {
822             return true;
823         }
824         if (conn.isClosed()) {
825             unsafe().close(newPromise());
826             return true;
827         }
828         return false;
829     }
830 
831     private void closeStreams() {
832         if (streams.isEmpty()) {
833             return;
834         }
835         final ClosedChannelException closedChannelException;
836         if (isTimedOut()) {
837             // Close the streams because of a timeout.
838             closedChannelException = new QuicTimeoutClosedChannelException();
839         } else {
840             closedChannelException = new ClosedChannelException();
841         }
842         // Make a copy to ensure we not run into a situation when we change the underlying iterator from
843         // another method and so run in an assert error.
844         for (QuicheQuicStreamChannel stream: streams.values().toArray(new QuicheQuicStreamChannel[0])) {
845             stream.unsafe().close(closedChannelException, voidPromise());
846         }
847         streams.clear();
848     }
849 
850     void streamPriority(long streamId, byte priority, boolean incremental) throws Exception {
851        int res = Quiche.quiche_conn_stream_priority(connectionAddressChecked(connection), streamId,
852                priority, incremental);
853        if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
854            throw Quiche.convertToException(res);
855        }
856     }
857 
858     void streamClosed(long streamId) {
859         streams.remove(streamId);
860     }
861 
862     boolean isStreamLocalCreated(long streamId) {
863         return (streamId & 0x1) == (server ? 1 : 0);
864     }
865 
866     QuicStreamType streamType(long streamId) {
867         return (streamId & 0x2) == 0 ? QuicStreamType.BIDIRECTIONAL : QuicStreamType.UNIDIRECTIONAL;
868     }
869 
870     void streamShutdown(long streamId, boolean read, boolean write, int err, ChannelPromise promise) {
871         QuicheQuicConnection conn = this.connection;
872         final long connectionAddress;
873         try {
874             connectionAddress = connectionAddressChecked(conn);
875         } catch (ClosedChannelException e) {
876             promise.setFailure(e);
877             return;
878         }
879         int res = 0;
880         if (read) {
881             res |= Quiche.quiche_conn_stream_shutdown(connectionAddress, streamId, Quiche.QUICHE_SHUTDOWN_READ, err);
882         }
883         if (write) {
884             res |= Quiche.quiche_conn_stream_shutdown(connectionAddress, streamId, Quiche.QUICHE_SHUTDOWN_WRITE, err);
885         }
886 
887         // As we called quiche_conn_stream_shutdown(...) we need to ensure we will call quiche_conn_send(...) either
888         // now or we will do so once we see the channelReadComplete event.
889         //
890         // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.send
891         if (connectionSend(conn) != SendResult.NONE) {
892             // Force the flush so the shutdown can be seen asap.
893             forceFlushParent();
894         }
895         if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
896             promise.setFailure(Quiche.convertToException(res));
897         } else {
898             promise.setSuccess();
899         }
900     }
901 
902     void streamSendFin(long streamId) throws Exception {
903         QuicheQuicConnection conn = connection;
904         try {
905             // Just write an empty buffer and set fin to true.
906             int res = streamSend0(conn, streamId, Unpooled.EMPTY_BUFFER, true);
907             if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
908                 throw Quiche.convertToException(res);
909             }
910         } finally {
911             // As we called quiche_conn_stream_send(...) we need to ensure we will call quiche_conn_send(...) either
912             // now or we will do so once we see the channelReadComplete event.
913             //
914             // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.send
915             if (connectionSend(conn) != SendResult.NONE) {
916                 flushParent();
917             }
918         }
919     }
920 
921     int streamSend(long streamId, ByteBuf buffer, boolean fin) throws ClosedChannelException {
922         QuicheQuicConnection conn = connection;
923         if (buffer.nioBufferCount() == 1) {
924             return streamSend0(conn, streamId, buffer, fin);
925         }
926         ByteBuffer[] nioBuffers  = buffer.nioBuffers();
927         int lastIdx = nioBuffers.length - 1;
928         int res = 0;
929         for (int i = 0; i < lastIdx; i++) {
930             ByteBuffer nioBuffer = nioBuffers[i];
931             while (nioBuffer.hasRemaining()) {
932                 int localRes = streamSend(conn, streamId, nioBuffer, false);
933                 if (localRes <= 0) {
934                     return res;
935                 }
936                 res += localRes;
937 
938                 nioBuffer.position(nioBuffer.position() + localRes);
939             }
940         }
941         int localRes = streamSend(conn, streamId, nioBuffers[lastIdx], fin);
942         if (localRes > 0) {
943             res += localRes;
944         }
945         return res;
946     }
947 
948     void connectionSendAndFlush() {
949         if (inFireChannelReadCompleteQueue || (reantranceGuard & IN_HANDLE_WRITABLE_STREAMS) != 0) {
950             return;
951         }
952         if (connectionSend(connection) != SendResult.NONE) {
953             flushParent();
954         }
955     }
956 
957     private int streamSend0(QuicheQuicConnection conn, long streamId, ByteBuf buffer, boolean fin)
958             throws ClosedChannelException {
959         return Quiche.quiche_conn_stream_send(connectionAddressChecked(conn), streamId,
960                 Quiche.readerMemoryAddress(buffer), buffer.readableBytes(), fin);
961     }
962 
963     private int streamSend(QuicheQuicConnection conn, long streamId, ByteBuffer buffer, boolean fin)
964             throws ClosedChannelException {
965         return Quiche.quiche_conn_stream_send(connectionAddressChecked(conn), streamId,
966                 Quiche.memoryAddressWithPosition(buffer), buffer.remaining(), fin);
967     }
968 
969     StreamRecvResult streamRecv(long streamId, ByteBuf buffer) throws Exception {
970         QuicheQuicConnection conn = connection;
971         long connAddr = connectionAddressChecked(conn);
972         if (finBuffer == null) {
973             finBuffer = alloc().directBuffer(1);
974         }
975         if (outErrorCodeBuffer == null) {
976             outErrorCodeBuffer = alloc().directBuffer(8);
977         }
978         outErrorCodeBuffer.setLongLE(0, -1L);
979         int writerIndex = buffer.writerIndex();
980         int recvLen = Quiche.quiche_conn_stream_recv(connAddr, streamId,
981                 Quiche.writerMemoryAddress(buffer), buffer.writableBytes(), Quiche.writerMemoryAddress(finBuffer),
982                 Quiche.writerMemoryAddress(outErrorCodeBuffer));
983         long errorCode = outErrorCodeBuffer.getLongLE(0);
984         if (recvLen == Quiche.QUICHE_ERR_DONE) {
985             return StreamRecvResult.DONE;
986         } else if (recvLen < 0) {
987             throw Quiche.convertToException(recvLen, errorCode);
988         }
989         buffer.writerIndex(writerIndex + recvLen);
990         return finBuffer.getBoolean(0) ? StreamRecvResult.FIN : StreamRecvResult.OK;
991     }
992 
993     /**
994      * Receive some data on a QUIC connection.
995      */
996     void recv(InetSocketAddress sender, InetSocketAddress recipient, ByteBuf buffer) {
997         ((QuicChannelUnsafe) unsafe()).connectionRecv(sender, recipient, buffer);
998     }
999 
1000     /**
1001      * Return all source connection ids that are retired and so should be removed to map to the channel.
1002      *
1003      * @return retired ids.
1004      */
1005     List<ByteBuffer> retiredSourceConnectionId() {
1006         QuicheQuicConnection connection = this.connection;
1007         if (connection == null || connection.isFreed()) {
1008             return Collections.emptyList();
1009         }
1010         long connAddr = connection.address();
1011         assert connAddr != -1;
1012         List<ByteBuffer> retiredSourceIds = null;
1013         for (;;) {
1014             byte[] retired = Quiche.quiche_conn_retired_scid_next(connAddr);
1015             if (retired == null) {
1016                 break;
1017             }
1018             if (retiredSourceIds == null) {
1019                 retiredSourceIds = new ArrayList<>();
1020             }
1021             ByteBuffer retiredId = ByteBuffer.wrap(retired);
1022             retiredSourceIds.add(retiredId);
1023             sourceConnectionIds.remove(retiredId);
1024         }
1025         if (retiredSourceIds == null) {
1026             return Collections.emptyList();
1027         }
1028         return retiredSourceIds;
1029     }
1030 
1031     List<ByteBuffer> newSourceConnectionIds() {
1032         if (connectionIdAddressGenerator != null && resetTokenGenerator != null) {
1033             QuicheQuicConnection connection = this.connection;
1034             if (connection == null || connection.isFreed()) {
1035                 return Collections.emptyList();
1036             }
1037             long connAddr = connection.address();
1038             // Generate all extra source ids that we can provide. This will cause frames that need to be sent. Which
1039             // is the reason why we might need to call connectionSendAndFlush().
1040             int left = Quiche.quiche_conn_scids_left(connAddr);
1041             if (left > 0) {
1042                 QuicConnectionAddress sourceAddr = connection.sourceId();
1043                 if (sourceAddr == null) {
1044                     return Collections.emptyList();
1045                 }
1046                 List<ByteBuffer> generatedIds = new ArrayList<>(left);
1047                 boolean sendAndFlush = false;
1048                 ByteBuffer key = sourceAddr.id();
1049                 ByteBuf connIdBuffer = alloc().directBuffer(key.remaining());
1050 
1051                 byte[] resetTokenArray = new byte[Quic.RESET_TOKEN_LEN];
1052                 try {
1053                     do {
1054                         ByteBuffer srcId = connectionIdAddressGenerator.newId(key.duplicate(), key.remaining())
1055                                 .asReadOnlyBuffer();
1056                         connIdBuffer.clear();
1057                         connIdBuffer.writeBytes(srcId.duplicate());
1058                         ByteBuffer resetToken = resetTokenGenerator.newResetToken(srcId.duplicate());
1059                         resetToken.get(resetTokenArray);
1060                         long result = Quiche.quiche_conn_new_scid(
1061                                 connAddr, Quiche.memoryAddress(connIdBuffer, 0, connIdBuffer.readableBytes()),
1062                                 connIdBuffer.readableBytes(), resetTokenArray, false, -1);
1063                         if (result < 0) {
1064                             break;
1065                         }
1066                         sendAndFlush = true;
1067                         generatedIds.add(srcId.duplicate());
1068                         sourceConnectionIds.add(srcId);
1069                     } while (--left > 0);
1070                 } finally {
1071                     connIdBuffer.release();
1072                 }
1073 
1074                 if (sendAndFlush) {
1075                     connectionSendAndFlush();
1076                 }
1077                 return generatedIds;
1078             }
1079         }
1080         return Collections.emptyList();
1081     }
1082 
1083     void writable() {
1084         QuicheQuicConnection conn = connection;
1085         SendResult result = connectionSend(conn);
1086         handleWritableStreams(conn);
1087         if (connectionSend(conn) == SendResult.SOME) {
1088             result = SendResult.SOME;
1089         }
1090         if (result == SendResult.SOME) {
1091             // The writability changed so lets flush as fast as possible.
1092             forceFlushParent();
1093         }
1094         freeIfClosed();
1095     }
1096 
1097     long streamCapacity(long streamId) {
1098         QuicheQuicConnection conn = connection;
1099         if (conn.isClosed()) {
1100             return 0;
1101         }
1102         return Quiche.quiche_conn_stream_capacity(conn.address(), streamId);
1103     }
1104 
1105     private boolean handleWritableStreams(QuicheQuicConnection conn) {
1106         if (conn.isFreed()) {
1107             return false;
1108         }
1109         reantranceGuard |= IN_HANDLE_WRITABLE_STREAMS;
1110         try {
1111             long connAddr = conn.address();
1112             boolean mayNeedWrite = false;
1113 
1114             if (Quiche.quiche_conn_is_established(connAddr) ||
1115                     Quiche.quiche_conn_is_in_early_data(connAddr)) {
1116                 long writableIterator = Quiche.quiche_conn_writable(connAddr);
1117 
1118                 int totalWritable = 0;
1119                 try {
1120                     // For streams we always process all streams when at least on read was requested.
1121                     for (;;) {
1122                         int writable = Quiche.quiche_stream_iter_next(
1123                                 writableIterator, writableStreams);
1124                         for (int i = 0; i < writable; i++) {
1125                             long streamId = writableStreams[i];
1126                             QuicheQuicStreamChannel streamChannel = streams.get(streamId);
1127                             if (streamChannel != null) {
1128                                 long capacity = Quiche.quiche_conn_stream_capacity(connAddr, streamId);
1129                                 if (streamChannel.writable(capacity)) {
1130                                     mayNeedWrite = true;
1131                                 }
1132                             }
1133                         }
1134                         if (writable > 0) {
1135                             totalWritable += writable;
1136                         }
1137                         if (writable < writableStreams.length) {
1138                             // We did handle all writable streams.
1139                             break;
1140                         }
1141                     }
1142                 } finally {
1143                     Quiche.quiche_stream_iter_free(writableIterator);
1144                 }
1145                 writableStreams = growIfNeeded(writableStreams, totalWritable);
1146             }
1147             return mayNeedWrite;
1148         } finally {
1149             reantranceGuard &= ~IN_HANDLE_WRITABLE_STREAMS;
1150         }
1151     }
1152 
1153     /**
1154      * Called once we receive a channelReadComplete event. This method will take care of calling
1155      * {@link ChannelPipeline#fireChannelReadComplete()} if needed and also to handle pending flushes of
1156      * writable {@link QuicheQuicStreamChannel}s.
1157      */
1158     void recvComplete() {
1159         try {
1160             QuicheQuicConnection conn = connection;
1161             if (conn.isFreed()) {
1162                 // Ensure we flush all pending writes.
1163                 forceFlushParent();
1164                 return;
1165             }
1166             fireChannelReadCompleteIfNeeded();
1167 
1168             // If we had called recv we need to ensure we call send as well.
1169             // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.send
1170             connectionSend(conn);
1171 
1172             // We are done with the read loop, flush all pending writes now.
1173             forceFlushParent();
1174             freeIfClosed();
1175         } finally {
1176             inFireChannelReadCompleteQueue = false;
1177         }
1178     }
1179 
1180     private void fireChannelReadCompleteIfNeeded() {
1181         if (fireChannelReadCompletePending) {
1182             fireChannelReadCompletePending = false;
1183             pipeline().fireChannelReadComplete();
1184         }
1185     }
1186 
1187     private void fireExceptionEvents(QuicheQuicConnection conn, Throwable cause) {
1188         if (cause instanceof SSLHandshakeException) {
1189             notifyAboutHandshakeCompletionIfNeeded(conn, (SSLHandshakeException) cause);
1190         }
1191         pipeline().fireExceptionCaught(cause);
1192     }
1193 
1194     private boolean runTasksDirectly() {
1195         return sslTaskExecutor == null || sslTaskExecutor == ImmediateExecutor.INSTANCE ||
1196                 sslTaskExecutor == ImmediateEventExecutor.INSTANCE;
1197     }
1198 
1199     private void runAllTaskSend(QuicheQuicConnection conn, Runnable task) {
1200         sslTaskExecutor.execute(decorateTaskSend(conn, task));
1201     }
1202 
1203     private void runAll(QuicheQuicConnection conn, Runnable task) {
1204         do {
1205             task.run();
1206         } while ((task = conn.sslTask()) != null);
1207     }
1208 
1209     private Runnable decorateTaskSend(QuicheQuicConnection conn, Runnable task) {
1210         return () -> {
1211             try {
1212                 runAll(conn, task);
1213             } finally {
1214                 // Move back to the EventLoop.
1215                 eventLoop().execute(() -> {
1216                     // Call connection send to continue handshake if needed.
1217                     if (connectionSend(conn) != SendResult.NONE) {
1218                         forceFlushParent();
1219                     }
1220                     freeIfClosed();
1221                 });
1222             }
1223         };
1224     }
1225 
1226     private SendResult connectionSendSegments(QuicheQuicConnection conn,
1227                                               SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator) {
1228         if (conn.isClosed()) {
1229             return SendResult.NONE;
1230         }
1231         List<ByteBuf> bufferList = new ArrayList<>(segmentedDatagramPacketAllocator.maxNumSegments());
1232         long connAddr = conn.address();
1233         int maxDatagramSize = Quiche.quiche_conn_max_send_udp_payload_size(connAddr);
1234         SendResult sendResult = SendResult.NONE;
1235         boolean close = false;
1236         try {
1237             for (;;) {
1238                 int len = calculateSendBufferLength(connAddr, maxDatagramSize);
1239                 ByteBuf out = alloc().directBuffer(len);
1240 
1241                 ByteBuffer sendInfo = conn.nextSendInfo();
1242                 InetSocketAddress sendToAddress = this.remote;
1243 
1244                 int writerIndex = out.writerIndex();
1245                 int written = Quiche.quiche_conn_send(
1246                         connAddr, Quiche.writerMemoryAddress(out), out.writableBytes(),
1247                         Quiche.memoryAddressWithPosition(sendInfo));
1248                 if (written == 0) {
1249                     out.release();
1250                     // No need to create a new datagram packet. Just try again.
1251                     continue;
1252                 }
1253                 final boolean done;
1254                 if (written < 0) {
1255                     done = true;
1256                     if (written != Quiche.QUICHE_ERR_DONE) {
1257                         close = Quiche.shouldClose(written);
1258                         Exception e = Quiche.convertToException(written);
1259                         if (!tryFailConnectPromise(e)) {
1260                             // Only fire through the pipeline if this does not fail the connect promise.
1261                             fireExceptionEvents(conn, e);
1262                         }
1263                     }
1264                 } else {
1265                     done = false;
1266                 }
1267                 int size = bufferList.size();
1268                 if (done) {
1269                     // We are done, release the buffer and send what we did build up so far.
1270                     out.release();
1271 
1272                     switch (size) {
1273                         case 0:
1274                             // Nothing more to write.
1275                             break;
1276                         case 1:
1277                             // We can write a normal datagram packet.
1278                             parent().write(new DatagramPacket(bufferList.get(0), sendToAddress));
1279                             sendResult = SendResult.SOME;
1280                             break;
1281                         default:
1282                             int segmentSize = segmentSize(bufferList);
1283                             ByteBuf compositeBuffer = Unpooled.wrappedBuffer(bufferList.toArray(new ByteBuf[0]));
1284                             // We had more than one buffer, create a segmented packet.
1285                             parent().write(segmentedDatagramPacketAllocator.newPacket(
1286                                     compositeBuffer, segmentSize, sendToAddress));
1287                             sendResult = SendResult.SOME;
1288                             break;
1289                     }
1290                     bufferList.clear();
1291                     if (close) {
1292                         sendResult = SendResult.CLOSE;
1293                     }
1294                     return sendResult;
1295                 }
1296                 out.writerIndex(writerIndex + written);
1297 
1298                 int segmentSize = -1;
1299                 if (conn.isSendInfoChanged()) {
1300                     // Change the cached address and let the user know there was a connection migration.
1301                     remote = QuicheSendInfo.getToAddress(sendInfo);
1302                     local = QuicheSendInfo.getFromAddress(sendInfo);
1303 
1304                     if (size > 0) {
1305                         // We have something in the out list already, we need to send this now and so we set the
1306                         // segmentSize.
1307                         segmentSize = segmentSize(bufferList);
1308                     }
1309                 } else if (size > 0) {
1310                     int lastReadable = segmentSize(bufferList);
1311                     // Check if we either need to send now because the last buffer we added has a smaller size then this
1312                     // one or if we reached the maximum number of segments that we can send.
1313                     if (lastReadable != out.readableBytes() ||
1314                             size == segmentedDatagramPacketAllocator.maxNumSegments()) {
1315                         segmentSize = lastReadable;
1316                     }
1317                 }
1318 
1319                 // If the segmentSize is not -1 we know we need to send now what was in the out list.
1320                 if (segmentSize != -1) {
1321                     final boolean stop;
1322                     if (size == 1) {
1323                         // Only one buffer in the out list, there is no need to use segments.
1324                         stop = writePacket(new DatagramPacket(
1325                                 bufferList.get(0), sendToAddress), maxDatagramSize, len);
1326                     } else {
1327                         // Create a packet with segments in.
1328                         ByteBuf compositeBuffer = Unpooled.wrappedBuffer(bufferList.toArray(new ByteBuf[0]));
1329                         stop = writePacket(segmentedDatagramPacketAllocator.newPacket(
1330                                 compositeBuffer, segmentSize, sendToAddress), maxDatagramSize, len);
1331                     }
1332                     bufferList.clear();
1333                     sendResult = SendResult.SOME;
1334 
1335                     if (stop) {
1336                         // Nothing left in the window, continue later. That said we still need to also
1337                         // write the previous filled out buffer as otherwise we would either leak or need
1338                         // to drop it and so produce some loss.
1339                         if (out.isReadable()) {
1340                             parent().write(new DatagramPacket(out, sendToAddress));
1341                         } else {
1342                             out.release();
1343                         }
1344                         if (close) {
1345                             sendResult = SendResult.CLOSE;
1346                         }
1347                         return sendResult;
1348                     }
1349                 }
1350                 // Let's add a touch with the bufferList as a hint. This will help us to debug leaks if there
1351                 // are any.
1352                 out.touch(bufferList);
1353                 // store for later, so we can make use of segments.
1354                 bufferList.add(out);
1355             }
1356         } finally {
1357             // NOOP
1358         }
1359     }
1360 
1361     private static int segmentSize(List<ByteBuf> bufferList) {
1362         assert !bufferList.isEmpty();
1363         int size = bufferList.size();
1364         return bufferList.get(size - 1).readableBytes();
1365     }
1366 
1367     private SendResult connectionSendSimple(QuicheQuicConnection conn) {
1368         if (conn.isClosed()) {
1369             return SendResult.NONE;
1370         }
1371         long connAddr = conn.address();
1372         SendResult sendResult = SendResult.NONE;
1373         boolean close = false;
1374         int maxDatagramSize = Quiche.quiche_conn_max_send_udp_payload_size(connAddr);
1375         for (;;) {
1376             ByteBuffer sendInfo = conn.nextSendInfo();
1377 
1378             int len = calculateSendBufferLength(connAddr, maxDatagramSize);
1379             ByteBuf out = alloc().directBuffer(len);
1380             int writerIndex = out.writerIndex();
1381 
1382             int written = Quiche.quiche_conn_send(
1383                     connAddr, Quiche.writerMemoryAddress(out), out.writableBytes(),
1384                     Quiche.memoryAddressWithPosition(sendInfo));
1385 
1386             if (written == 0) {
1387                 // No need to create a new datagram packet. Just release and try again.
1388                 out.release();
1389                 continue;
1390             }
1391             if (written < 0) {
1392                 out.release();
1393                 if (written != Quiche.QUICHE_ERR_DONE) {
1394                     close = Quiche.shouldClose(written);
1395 
1396                     Exception e = Quiche.convertToException(written);
1397                     if (!tryFailConnectPromise(e)) {
1398                         fireExceptionEvents(conn, e);
1399                     }
1400                 }
1401                 break;
1402             }
1403             if (conn.isSendInfoChanged()) {
1404                 // Change the cached address
1405                 remote = QuicheSendInfo.getToAddress(sendInfo);
1406                 local = QuicheSendInfo.getFromAddress(sendInfo);
1407             }
1408             out.writerIndex(writerIndex + written);
1409             boolean stop = writePacket(new DatagramPacket(out, remote), maxDatagramSize, len);
1410             sendResult = SendResult.SOME;
1411             if (stop) {
1412                 // Nothing left in the window, continue later
1413                 break;
1414             }
1415         }
1416         if (close) {
1417             sendResult = SendResult.CLOSE;
1418         }
1419         return sendResult;
1420     }
1421 
1422     private boolean writePacket(DatagramPacket packet, int maxDatagramSize, int len) {
1423         ChannelFuture future = parent().write(packet);
1424         if (isSendWindowUsed(maxDatagramSize, len)) {
1425             // Nothing left in the window, continue later
1426             future.addListener(continueSendingListener);
1427             return true;
1428         }
1429         return false;
1430     }
1431 
1432     private static boolean isSendWindowUsed(int maxDatagramSize, int len) {
1433         return len < maxDatagramSize;
1434     }
1435 
1436     private static int calculateSendBufferLength(long connAddr, int maxDatagramSize) {
1437         int len = Math.min(maxDatagramSize, Quiche.quiche_conn_send_quantum(connAddr));
1438         if (len <= 0) {
1439             // If there is no room left we just return some small number to reduce the risk of packet drop
1440             // while still be able to attach the listener to the write future.
1441             // We use the value of 8 because such an allocation will be cheap to serve from the
1442             // PooledByteBufAllocator while still serve our need.
1443             return 8;
1444         }
1445         return len;
1446     }
1447 
1448     /**
1449      * Write datagrams if needed and return {@code true} if something was written and we need to call
1450      * {@link Channel#flush()} at some point.
1451      */
1452     private SendResult connectionSend(QuicheQuicConnection conn) {
1453         if (conn.isFreed()) {
1454             return SendResult.NONE;
1455         }
1456         if ((reantranceGuard & IN_CONNECTION_SEND) != 0) {
1457             // Let's notify about early data if needed.
1458             notifyEarlyDataReadyIfNeeded(conn);
1459             return SendResult.NONE;
1460         }
1461 
1462         reantranceGuard |= IN_CONNECTION_SEND;
1463         try {
1464             SendResult sendResult;
1465             SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator =
1466                     config.getSegmentedDatagramPacketAllocator();
1467             if (segmentedDatagramPacketAllocator.maxNumSegments() > 0) {
1468                 sendResult = connectionSendSegments(conn, segmentedDatagramPacketAllocator);
1469             } else {
1470                 sendResult = connectionSendSimple(conn);
1471             }
1472 
1473             // Process / schedule all tasks that were created.
1474 
1475             Runnable task = conn.sslTask();
1476             if (task != null) {
1477                 if (runTasksDirectly()) {
1478                     // Consume all tasks
1479                     do {
1480                         task.run();
1481                         // Notify about early data ready if needed.
1482                         notifyEarlyDataReadyIfNeeded(conn);
1483                     } while ((task = conn.sslTask()) != null);
1484 
1485                     // Let's try again sending after we did process all tasks.
1486                     // We schedule this on the EventLoop as otherwise we will get into trouble with re-entrance.
1487                     eventLoop().execute(new Runnable() {
1488                         @Override
1489                         public void run() {
1490                             // Call connection send to continue handshake if needed.
1491                             if (connectionSend(conn) != SendResult.NONE) {
1492                                 forceFlushParent();
1493                             }
1494                             freeIfClosed();
1495                         }
1496                     });
1497                 } else {
1498                     runAllTaskSend(conn, task);
1499                 }
1500             } else {
1501                 // Notify about early data ready if needed.
1502                 notifyEarlyDataReadyIfNeeded(conn);
1503             }
1504 
1505             // Whenever we called connection_send we should also schedule the timer if needed.
1506             timeoutHandler.scheduleTimeout();
1507             return sendResult;
1508         } finally {
1509             reantranceGuard &= ~IN_CONNECTION_SEND;
1510         }
1511     }
1512 
1513     private final class QuicChannelUnsafe extends AbstractChannel.AbstractUnsafe {
1514 
1515         void connectStream(QuicStreamType type, @Nullable ChannelHandler handler,
1516                            Promise<QuicStreamChannel> promise) {
1517             if (!promise.setUncancellable()) {
1518                 return;
1519             }
1520             long streamId = idGenerator.nextStreamId(type == QuicStreamType.BIDIRECTIONAL);
1521 
1522             try {
1523                 int res = streamSend0(connection, streamId, Unpooled.EMPTY_BUFFER, false);
1524                 if (res < 0 && res != Quiche.QUICHE_ERR_DONE) {
1525                     throw Quiche.convertToException(res);
1526                 }
1527             } catch (Exception e) {
1528                 promise.setFailure(e);
1529                 return;
1530             }
1531             if (type == QuicStreamType.UNIDIRECTIONAL) {
1532                 UNI_STREAMS_LEFT_UPDATER.decrementAndGet(QuicheQuicChannel.this);
1533             } else {
1534                 BIDI_STREAMS_LEFT_UPDATER.decrementAndGet(QuicheQuicChannel.this);
1535             }
1536             QuicheQuicStreamChannel streamChannel = addNewStreamChannel(streamId);
1537             if (handler != null) {
1538                 streamChannel.pipeline().addLast(handler);
1539             }
1540             eventLoop().register(streamChannel).addListener((ChannelFuture f) -> {
1541                 if (f.isSuccess()) {
1542                     promise.setSuccess(streamChannel);
1543                 } else {
1544                     promise.setFailure(f.cause());
1545                     streams.remove(streamId);
1546                 }
1547             });
1548         }
1549 
1550         @Override
1551         public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
1552             assert eventLoop().inEventLoop();
1553             if (!channelPromise.setUncancellable()) {
1554                 return;
1555             }
1556             if (server) {
1557                 channelPromise.setFailure(new UnsupportedOperationException());
1558                 return;
1559             }
1560 
1561             if (connectPromise != null) {
1562                 channelPromise.setFailure(new ConnectionPendingException());
1563                 return;
1564             }
1565 
1566             if (remote instanceof QuicConnectionAddress) {
1567                 if (!sourceConnectionIds.isEmpty()) {
1568                     // If a key is assigned we know this channel was already connected.
1569                     channelPromise.setFailure(new AlreadyConnectedException());
1570                     return;
1571                 }
1572 
1573                 connectLocalAddress = (QuicConnectionAddress) local;
1574                 connectRemoteAddress = (QuicConnectionAddress) remote;
1575                 connectPromise = channelPromise;
1576 
1577                 // Schedule connect timeout.
1578                 int connectTimeoutMillis = config().getConnectTimeoutMillis();
1579                 if (connectTimeoutMillis > 0) {
1580                     connectTimeoutFuture = eventLoop().schedule(() -> {
1581                         ChannelPromise connectPromise = QuicheQuicChannel.this.connectPromise;
1582                         if (connectPromise != null && !connectPromise.isDone()
1583                                 && connectPromise.tryFailure(new ConnectTimeoutException(
1584                                 "connection timed out: " + local + " -> " +  remote))) {
1585                             close(voidPromise());
1586                         }
1587                     }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1588                 }
1589 
1590                 connectPromise.addListener((ChannelFuture future) -> {
1591                     if (future.isCancelled()) {
1592                         if (connectTimeoutFuture != null) {
1593                             connectTimeoutFuture.cancel(false);
1594                         }
1595                         connectPromise = null;
1596                         close(voidPromise());
1597                     }
1598                 });
1599 
1600                 parent().connect(new QuicheQuicChannelAddress(QuicheQuicChannel.this))
1601                         .addListener(f -> {
1602                             ChannelPromise connectPromise = QuicheQuicChannel.this.connectPromise;
1603                             if (connectPromise != null && !f.isSuccess()) {
1604                                 connectPromise.tryFailure(f.cause());
1605                                 // close everything after notify about failure.
1606                                 unsafe().closeForcibly();
1607                             }
1608                         });
1609                 return;
1610             }
1611 
1612             channelPromise.setFailure(new UnsupportedOperationException());
1613         }
1614 
1615         private void fireConnectCloseEventIfNeeded(QuicheQuicConnection conn) {
1616             if (connectionCloseEvent == null && !conn.isFreed()) {
1617                 connectionCloseEvent = Quiche.quiche_conn_peer_error(conn.address());
1618                 if (connectionCloseEvent != null) {
1619                     pipeline().fireUserEventTriggered(connectionCloseEvent);
1620                 }
1621             }
1622         }
1623 
1624         void connectionRecv(InetSocketAddress sender, InetSocketAddress recipient, ByteBuf buffer) {
1625             QuicheQuicConnection conn = QuicheQuicChannel.this.connection;
1626             if (conn.isFreed()) {
1627                 return;
1628             }
1629             int bufferReadable = buffer.readableBytes();
1630             if (bufferReadable == 0) {
1631                 // Nothing to do here. Just return...
1632                 // See also https://github.com/cloudflare/quiche/issues/817
1633                 return;
1634             }
1635 
1636             reantranceGuard |= IN_RECV;
1637             boolean close = false;
1638             try {
1639                 ByteBuf tmpBuffer = null;
1640                 // We need to make a copy if the buffer is read only as recv(...) may modify the input buffer as well.
1641                 // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.recv
1642                 if (buffer.isReadOnly()) {
1643                     tmpBuffer = alloc().directBuffer(buffer.readableBytes());
1644                     tmpBuffer.writeBytes(buffer);
1645                     buffer = tmpBuffer;
1646                 }
1647                 long memoryAddress = Quiche.readerMemoryAddress(buffer);
1648 
1649                 ByteBuffer recvInfo = conn.nextRecvInfo();
1650                 QuicheRecvInfo.setRecvInfo(recvInfo, sender, recipient);
1651 
1652                 remote = sender;
1653                 local = recipient;
1654 
1655                 try {
1656                     do  {
1657                         // Call quiche_conn_recv(...) until we consumed all bytes or we did receive some error.
1658                         int res = Quiche.quiche_conn_recv(conn.address(), memoryAddress, bufferReadable,
1659                                 Quiche.memoryAddressWithPosition(recvInfo));
1660                         final boolean done;
1661                         if (res < 0) {
1662                             done = true;
1663                             if (res != Quiche.QUICHE_ERR_DONE) {
1664                                 close = Quiche.shouldClose(res);
1665                                 Exception e = Quiche.convertToException(res);
1666                                 if (tryFailConnectPromise(e)) {
1667                                     break;
1668                                 }
1669                                 fireExceptionEvents(conn, e);
1670                             }
1671                         } else {
1672                             done = false;
1673                         }
1674                         // Process / schedule all tasks that were created.
1675                         Runnable task = conn.sslTask();
1676                         if (task != null) {
1677                             if (runTasksDirectly()) {
1678                                 // Consume all tasks
1679                                 do {
1680                                     task.run();
1681                                 } while ((task = conn.sslTask()) != null);
1682                                 processReceived(conn);
1683                             } else {
1684                                 runAllTaskRecv(conn, task);
1685                             }
1686                         } else {
1687                             processReceived(conn);
1688                         }
1689 
1690                         if (done) {
1691                             break;
1692                         }
1693                         memoryAddress += res;
1694                         bufferReadable -= res;
1695                     } while (bufferReadable > 0 && !conn.isFreed());
1696                 } finally {
1697                     buffer.skipBytes((int) (memoryAddress - Quiche.readerMemoryAddress(buffer)));
1698                     if (tmpBuffer != null) {
1699                         tmpBuffer.release();
1700                     }
1701                 }
1702                 if (close) {
1703                     // Let's close now as there is no way to recover
1704                     unsafe().close(newPromise());
1705                 }
1706             } finally {
1707                 reantranceGuard &= ~IN_RECV;
1708             }
1709         }
1710 
1711         private void processReceived(QuicheQuicConnection conn) {
1712             // Handle pending channelActive if needed.
1713             if (handlePendingChannelActive(conn)) {
1714                 // Connection was closed right away.
1715                 return;
1716             }
1717 
1718             notifyAboutHandshakeCompletionIfNeeded(conn, null);
1719             fireConnectCloseEventIfNeeded(conn);
1720 
1721             if (conn.isFreed()) {
1722                 return;
1723             }
1724 
1725             long connAddr = conn.address();
1726             if (Quiche.quiche_conn_is_established(connAddr) ||
1727                     Quiche.quiche_conn_is_in_early_data(connAddr)) {
1728                 long uniLeftOld = uniStreamsLeft;
1729                 long bidiLeftOld = bidiStreamsLeft;
1730                 // Only fetch new stream info when we used all our credits
1731                 if (uniLeftOld == 0 || bidiLeftOld == 0) {
1732                     long uniLeft = Quiche.quiche_conn_peer_streams_left_uni(connAddr);
1733                     long bidiLeft = Quiche.quiche_conn_peer_streams_left_bidi(connAddr);
1734                     uniStreamsLeft = uniLeft;
1735                     bidiStreamsLeft = bidiLeft;
1736                     if (uniLeftOld != uniLeft || bidiLeftOld != bidiLeft) {
1737                         pipeline().fireUserEventTriggered(QuicStreamLimitChangedEvent.INSTANCE);
1738                     }
1739                 }
1740 
1741                 handlePathEvents(conn);
1742 
1743                 if (handleWritableStreams(conn)) {
1744                     // Some data was produced, let's flush.
1745                     flushParent();
1746                 }
1747 
1748                 datagramReadable = true;
1749                 streamReadable = true;
1750 
1751                 recvDatagram(conn);
1752                 recvStream(conn);
1753             }
1754         }
1755 
1756         private void handlePathEvents(QuicheQuicConnection conn) {
1757             long event;
1758             while (!conn.isFreed() && (event = Quiche.quiche_conn_path_event_next(conn.address())) > 0) {
1759                 try {
1760                     int type = Quiche.quiche_path_event_type(event);
1761 
1762                     if (type == Quiche.QUICHE_PATH_EVENT_NEW) {
1763                         Object[] ret = Quiche.quiche_path_event_new(event);
1764                         InetSocketAddress local = (InetSocketAddress) ret[0];
1765                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1766                         pipeline().fireUserEventTriggered(new QuicPathEvent.New(local, peer));
1767                     } else if (type == Quiche.QUICHE_PATH_EVENT_VALIDATED) {
1768                         Object[] ret = Quiche.quiche_path_event_validated(event);
1769                         InetSocketAddress local = (InetSocketAddress) ret[0];
1770                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1771                         pipeline().fireUserEventTriggered(new QuicPathEvent.Validated(local, peer));
1772                     } else if (type == Quiche.QUICHE_PATH_EVENT_FAILED_VALIDATION) {
1773                         Object[] ret = Quiche.quiche_path_event_failed_validation(event);
1774                         InetSocketAddress local = (InetSocketAddress) ret[0];
1775                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1776                         pipeline().fireUserEventTriggered(new QuicPathEvent.FailedValidation(local, peer));
1777                     } else if (type == Quiche.QUICHE_PATH_EVENT_CLOSED) {
1778                         Object[] ret = Quiche.quiche_path_event_closed(event);
1779                         InetSocketAddress local = (InetSocketAddress) ret[0];
1780                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1781                         pipeline().fireUserEventTriggered(new QuicPathEvent.Closed(local, peer));
1782                     } else if (type == Quiche.QUICHE_PATH_EVENT_REUSED_SOURCE_CONNECTION_ID) {
1783                         Object[] ret = Quiche.quiche_path_event_reused_source_connection_id(event);
1784                         Long seq = (Long) ret[0];
1785                         InetSocketAddress localOld = (InetSocketAddress) ret[1];
1786                         InetSocketAddress peerOld = (InetSocketAddress) ret[2];
1787                         InetSocketAddress local = (InetSocketAddress) ret[3];
1788                         InetSocketAddress peer = (InetSocketAddress) ret[4];
1789                         pipeline().fireUserEventTriggered(
1790                                 new QuicPathEvent.ReusedSourceConnectionId(seq, localOld, peerOld, local, peer));
1791                     } else if (type == Quiche.QUICHE_PATH_EVENT_PEER_MIGRATED) {
1792                         Object[] ret = Quiche.quiche_path_event_peer_migrated(event);
1793                         InetSocketAddress local = (InetSocketAddress) ret[0];
1794                         InetSocketAddress peer = (InetSocketAddress) ret[1];
1795                         pipeline().fireUserEventTriggered(new QuicPathEvent.PeerMigrated(local, peer));
1796                     }
1797                 } finally {
1798                     Quiche.quiche_path_event_free(event);
1799                 }
1800             }
1801         }
1802 
1803         private void runAllTaskRecv(QuicheQuicConnection conn, Runnable task) {
1804             sslTaskExecutor.execute(decorateTaskRecv(conn, task));
1805         }
1806 
1807         private Runnable decorateTaskRecv(QuicheQuicConnection conn, Runnable task) {
1808             return () -> {
1809                 try {
1810                     runAll(conn, task);
1811                 } finally {
1812                     // Move back to the EventLoop.
1813                     eventLoop().execute(() -> {
1814                         if (!conn.isFreed()) {
1815                             processReceived(conn);
1816 
1817                             // Call connection send to continue handshake if needed.
1818                             if (connectionSend(conn) != SendResult.NONE) {
1819                                 forceFlushParent();
1820                             }
1821 
1822                             freeIfClosed();
1823                         }
1824                     });
1825                 }
1826             };
1827         }
1828         void recv() {
1829             QuicheQuicConnection conn = connection;
1830             if ((reantranceGuard & IN_RECV) != 0 || conn.isFreed()) {
1831                 return;
1832             }
1833 
1834             long connAddr = conn.address();
1835             // Check if we can read anything yet.
1836             if (!Quiche.quiche_conn_is_established(connAddr) &&
1837                     !Quiche.quiche_conn_is_in_early_data(connAddr)) {
1838                 return;
1839             }
1840 
1841             reantranceGuard |= IN_RECV;
1842             try {
1843                 recvDatagram(conn);
1844                 recvStream(conn);
1845             } finally {
1846                 fireChannelReadCompleteIfNeeded();
1847                 reantranceGuard &= ~IN_RECV;
1848             }
1849         }
1850 
1851         private void recvStream(QuicheQuicConnection conn) {
1852             if (conn.isFreed()) {
1853                 return;
1854             }
1855             long connAddr = conn.address();
1856             long readableIterator = Quiche.quiche_conn_readable(connAddr);
1857             int totalReadable = 0;
1858             if (readableIterator != -1) {
1859                 try {
1860                     // For streams we always process all streams when at least on read was requested.
1861                     if (recvStreamPending && streamReadable) {
1862                         for (;;) {
1863                             int readable = Quiche.quiche_stream_iter_next(
1864                                     readableIterator, readableStreams);
1865                             for (int i = 0; i < readable; i++) {
1866                                 long streamId = readableStreams[i];
1867                                 QuicheQuicStreamChannel streamChannel = streams.get(streamId);
1868                                 if (streamChannel == null) {
1869                                     recvStreamPending = false;
1870                                     fireChannelReadCompletePending = true;
1871                                     streamChannel = addNewStreamChannel(streamId);
1872                                     streamChannel.readable();
1873                                     pipeline().fireChannelRead(streamChannel);
1874                                 } else {
1875                                     streamChannel.readable();
1876                                 }
1877                             }
1878                             if (readable < readableStreams.length) {
1879                                 // We did consume all readable streams.
1880                                 streamReadable = false;
1881                                 break;
1882                             }
1883                             if (readable > 0) {
1884                                 totalReadable += readable;
1885                             }
1886                         }
1887                     }
1888                 } finally {
1889                     Quiche.quiche_stream_iter_free(readableIterator);
1890                 }
1891                 readableStreams = growIfNeeded(readableStreams, totalReadable);
1892             }
1893         }
1894 
1895         private void recvDatagram(QuicheQuicConnection conn) {
1896             if (!supportsDatagram) {
1897                 return;
1898             }
1899             while (recvDatagramPending && datagramReadable && !conn.isFreed()) {
1900                 @SuppressWarnings("deprecation")
1901                 RecvByteBufAllocator.Handle recvHandle = recvBufAllocHandle();
1902                 recvHandle.reset(config());
1903 
1904                 int numMessagesRead = 0;
1905                 do {
1906                     long connAddr = conn.address();
1907                     int len = Quiche.quiche_conn_dgram_recv_front_len(connAddr);
1908                     if (len == Quiche.QUICHE_ERR_DONE) {
1909                         datagramReadable = false;
1910                         return;
1911                     }
1912 
1913                     ByteBuf datagramBuffer = alloc().directBuffer(len);
1914                     recvHandle.attemptedBytesRead(datagramBuffer.writableBytes());
1915                     int writerIndex = datagramBuffer.writerIndex();
1916                     long memoryAddress = Quiche.writerMemoryAddress(datagramBuffer);
1917 
1918                     int written = Quiche.quiche_conn_dgram_recv(connAddr,
1919                             memoryAddress, datagramBuffer.writableBytes());
1920                     if (written < 0) {
1921                         datagramBuffer.release();
1922                         if (written == Quiche.QUICHE_ERR_DONE) {
1923                             // We did consume all datagram packets.
1924                             datagramReadable = false;
1925                             break;
1926                         }
1927                         pipeline().fireExceptionCaught(Quiche.convertToException(written));
1928                     }
1929                     recvHandle.lastBytesRead(written);
1930                     recvHandle.incMessagesRead(1);
1931                     numMessagesRead++;
1932                     datagramBuffer.writerIndex(writerIndex + written);
1933                     recvDatagramPending = false;
1934                     fireChannelReadCompletePending = true;
1935 
1936                     pipeline().fireChannelRead(datagramBuffer);
1937                 } while (recvHandle.continueReading() && !conn.isFreed());
1938                 recvHandle.readComplete();
1939 
1940                 // Check if we produced any messages.
1941                 if (numMessagesRead > 0) {
1942                     fireChannelReadCompleteIfNeeded();
1943                 }
1944             }
1945         }
1946 
1947         private boolean handlePendingChannelActive(QuicheQuicConnection conn) {
1948             if (conn.isFreed() || state == ChannelState.CLOSED) {
1949                 return true;
1950             }
1951             if (server) {
1952                 if (state == ChannelState.OPEN && Quiche.quiche_conn_is_established(conn.address())) {
1953                     // We didn't notify before about channelActive... Update state and fire the event.
1954                     state = ChannelState.ACTIVE;
1955 
1956                     fireDatagramExtensionEvent(conn);
1957                     pipeline().fireChannelActive();
1958                     notifyAboutHandshakeCompletionIfNeeded(conn, null);
1959                 }
1960             } else if (connectPromise != null && Quiche.quiche_conn_is_established(conn.address())) {
1961                 ChannelPromise promise = connectPromise;
1962                 connectPromise = null;
1963                 state = ChannelState.ACTIVE;
1964 
1965                 boolean promiseSet = promise.trySuccess();
1966                 fireDatagramExtensionEvent(conn);
1967                 pipeline().fireChannelActive();
1968                 notifyAboutHandshakeCompletionIfNeeded(conn, null);
1969                 if (!promiseSet) {
1970                     fireConnectCloseEventIfNeeded(conn);
1971                     this.close(this.voidPromise());
1972                     return true;
1973                 }
1974             }
1975             return false;
1976         }
1977 
1978         private void fireDatagramExtensionEvent(QuicheQuicConnection conn) {
1979             if (conn.isClosed()) {
1980                 return;
1981             }
1982             long connAddr = conn.address();
1983             int len = Quiche.quiche_conn_dgram_max_writable_len(connAddr);
1984             // QUICHE_ERR_DONE means the remote peer does not support the extension.
1985             if (len != Quiche.QUICHE_ERR_DONE) {
1986                 pipeline().fireUserEventTriggered(new QuicDatagramExtensionEvent(len));
1987             }
1988         }
1989 
1990         private QuicheQuicStreamChannel addNewStreamChannel(long streamId) {
1991             QuicheQuicStreamChannel streamChannel = new QuicheQuicStreamChannel(
1992                     QuicheQuicChannel.this, streamId);
1993             QuicheQuicStreamChannel old = streams.put(streamId, streamChannel);
1994             assert old == null;
1995             streamChannel.writable(streamCapacity(streamId));
1996             return streamChannel;
1997         }
1998     }
1999 
2000     /**
2001      * Finish the connect operation of a client channel.
2002      */
2003     void finishConnect() {
2004         assert !server;
2005         assert connection != null;
2006         if (connectionSend(connection) != SendResult.NONE) {
2007             flushParent();
2008         }
2009     }
2010 
2011     private void notifyEarlyDataReadyIfNeeded(QuicheQuicConnection conn) {
2012         if (!server && !earlyDataReadyNotified &&
2013                 !conn.isFreed() && Quiche.quiche_conn_is_in_early_data(conn.address())) {
2014             earlyDataReadyNotified = true;
2015             pipeline().fireUserEventTriggered(SslEarlyDataReadyEvent.INSTANCE);
2016         }
2017     }
2018 
2019     private final class TimeoutHandler implements Runnable {
2020         private ScheduledFuture<?> timeoutFuture;
2021 
2022         @Override
2023         public void run() {
2024             QuicheQuicConnection conn = connection;
2025             if (conn.isFreed()) {
2026                 return;
2027             }
2028             if (!freeIfClosed()) {
2029                 long connAddr = conn.address();
2030                 timeoutFuture = null;
2031                 // Notify quiche there was a timeout.
2032                 Quiche.quiche_conn_on_timeout(connAddr);
2033                 if (!freeIfClosed()) {
2034                     // We need to call connectionSend when a timeout was triggered.
2035                     // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.send.
2036                     if (connectionSend(conn) != SendResult.NONE) {
2037                         flushParent();
2038                     }
2039                     boolean closed = freeIfClosed();
2040                     if (!closed) {
2041                         // The connection is alive, reschedule.
2042                         scheduleTimeout();
2043                     }
2044                 }
2045             }
2046         }
2047 
2048         // Schedule timeout.
2049         // See https://docs.rs/quiche/0.6.0/quiche/#generating-outgoing-packets
2050         void scheduleTimeout() {
2051             QuicheQuicConnection conn = connection;
2052             if (conn.isFreed()) {
2053                 cancel();
2054                 return;
2055             }
2056             if (conn.isClosed()) {
2057                 cancel();
2058                 unsafe().close(newPromise());
2059                 return;
2060             }
2061             long nanos = Quiche.quiche_conn_timeout_as_nanos(conn.address());
2062             if (nanos < 0 || nanos == Long.MAX_VALUE) {
2063                 // No timeout needed.
2064                 cancel();
2065                 return;
2066             }
2067             if (timeoutFuture == null) {
2068                 timeoutFuture = eventLoop().schedule(this,
2069                         nanos, TimeUnit.NANOSECONDS);
2070             } else {
2071                 long remaining = timeoutFuture.getDelay(TimeUnit.NANOSECONDS);
2072                 if (remaining <= 0) {
2073                     // This means the timer already elapsed. In this case just cancel the future and call run()
2074                     // directly. This will ensure we correctly call quiche_conn_on_timeout() etc.
2075                     cancel();
2076                     run();
2077                 } else if (remaining > nanos) {
2078                     // The new timeout is smaller then what was scheduled before. Let's cancel the old timeout
2079                     // and schedule a new one.
2080                     cancel();
2081                     timeoutFuture = eventLoop().schedule(this, nanos, TimeUnit.NANOSECONDS);
2082                 }
2083             }
2084         }
2085 
2086         void cancel() {
2087             if (timeoutFuture != null) {
2088                 timeoutFuture.cancel(false);
2089                 timeoutFuture = null;
2090             }
2091         }
2092     }
2093 
2094     @Override
2095     public Future<QuicConnectionStats> collectStats(Promise<QuicConnectionStats> promise) {
2096         if (eventLoop().inEventLoop()) {
2097             collectStats0(promise);
2098         } else {
2099             eventLoop().execute(() -> collectStats0(promise));
2100         }
2101         return promise;
2102     }
2103 
2104     private void collectStats0(Promise<QuicConnectionStats> promise) {
2105         QuicheQuicConnection conn = connection;
2106         if (conn.isFreed()) {
2107             promise.setSuccess(statsAtClose);
2108             return;
2109         }
2110 
2111         collectStats0(connection, promise);
2112     }
2113 
2114     @Nullable
2115     private QuicConnectionStats collectStats0(QuicheQuicConnection connection, Promise<QuicConnectionStats> promise) {
2116         final long[] stats = Quiche.quiche_conn_stats(connection.address());
2117         if (stats == null) {
2118             promise.setFailure(new IllegalStateException("native quiche_conn_stats(...) failed"));
2119             return null;
2120         }
2121 
2122         final QuicheQuicConnectionStats connStats =
2123             new QuicheQuicConnectionStats(stats);
2124         promise.setSuccess(connStats);
2125         return connStats;
2126     }
2127 
2128     @Override
2129     public Future<QuicConnectionPathStats> collectPathStats(int pathIdx, Promise<QuicConnectionPathStats> promise) {
2130         if (eventLoop().inEventLoop()) {
2131             collectPathStats0(pathIdx, promise);
2132         } else {
2133             eventLoop().execute(() -> collectPathStats0(pathIdx, promise));
2134         }
2135         return promise;
2136     }
2137 
2138     private void collectPathStats0(int pathIdx, Promise<QuicConnectionPathStats> promise) {
2139         QuicheQuicConnection conn = connection;
2140         if (conn.isFreed()) {
2141             promise.setFailure(new IllegalStateException("Connection is closed"));
2142             return;
2143         }
2144 
2145         final Object[] stats = Quiche.quiche_conn_path_stats(connection.address(), pathIdx);
2146         if (stats == null) {
2147             promise.setFailure(new IllegalStateException("native quiche_conn_path_stats(...) failed"));
2148             return;
2149         }
2150         promise.setSuccess(new QuicheQuicConnectionPathStats(stats));
2151     }
2152 
2153     @Override
2154     public QuicTransportParameters peerTransportParameters() {
2155         return connection.peerParameters();
2156     }
2157 }