View Javadoc
1   /*
2    * Copyright 2020 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http3;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelHandler;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelOutboundHandler;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.PendingWriteQueue;
25  import io.netty.handler.codec.ByteToMessageDecoder;
26  import io.netty.handler.codec.quic.QuicStreamChannel;
27  import io.netty.handler.codec.quic.QuicStreamFrame;
28  import io.netty.util.ReferenceCountUtil;
29  import io.netty.util.concurrent.Future;
30  import io.netty.util.concurrent.GenericFutureListener;
31  import org.jetbrains.annotations.Nullable;
32  
33  import java.net.SocketAddress;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.function.BiFunction;
37  
38  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_CANCEL_PUSH_FRAME_MAX_LEN;
39  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_CANCEL_PUSH_FRAME_TYPE;
40  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_DATA_FRAME_TYPE;
41  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_GO_AWAY_FRAME_MAX_LEN;
42  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_GO_AWAY_FRAME_TYPE;
43  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_HEADERS_FRAME_TYPE;
44  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_MAX_PUSH_ID_FRAME_MAX_LEN;
45  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_MAX_PUSH_ID_FRAME_TYPE;
46  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_PUSH_PROMISE_FRAME_TYPE;
47  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_SETTINGS_FRAME_MAX_LEN;
48  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_SETTINGS_FRAME_TYPE;
49  import static io.netty.handler.codec.http3.Http3CodecUtils.numBytesForVariableLengthInteger;
50  import static io.netty.handler.codec.http3.Http3CodecUtils.readVariableLengthInteger;
51  import static io.netty.handler.codec.http3.Http3CodecUtils.writeVariableLengthInteger;
52  import static io.netty.util.internal.ObjectUtil.checkNotNull;
53  import static io.netty.util.internal.ObjectUtil.checkPositive;
54  
55  /**
56   * Decodes / encodes {@link Http3Frame}s.
57   */
58  final class Http3FrameCodec extends ByteToMessageDecoder implements ChannelOutboundHandler {
59      private final Http3FrameTypeValidator validator;
60      private final long maxHeaderListSize;
61      private final QpackDecoder qpackDecoder;
62      private final QpackEncoder qpackEncoder;
63      private final Http3RequestStreamCodecState encodeState;
64      private final Http3RequestStreamCodecState decodeState;
65      private final Http3Settings.NonStandardHttp3SettingsValidator nonStandardSettingsValidator;
66      private boolean firstFrame = true;
67      private boolean error;
68      private long type = -1;
69      private int payLoadLength = -1;
70      private QpackAttributes qpackAttributes;
71      private ReadResumptionListener readResumptionListener;
72      private WriteResumptionListener writeResumptionListener;
73  
74      static Http3FrameCodecFactory newFactory(QpackDecoder qpackDecoder,
75                                               long maxHeaderListSize, QpackEncoder qpackEncoder) {
76          checkNotNull(qpackEncoder, "qpackEncoder");
77          checkNotNull(qpackDecoder, "qpackDecoder");
78  
79          // QPACK decoder and encoder are shared between streams in a connection.
80          return (validator, encodeState, decodeState,
81                  nonStandardSettingsValidator) -> new Http3FrameCodec(validator, qpackDecoder,
82                  maxHeaderListSize, qpackEncoder, encodeState, decodeState, nonStandardSettingsValidator);
83      }
84  
85      Http3FrameCodec(Http3FrameTypeValidator validator, QpackDecoder qpackDecoder,
86                      long maxHeaderListSize, QpackEncoder qpackEncoder, Http3RequestStreamCodecState encodeState,
87                      Http3RequestStreamCodecState decodeState,
88                      Http3Settings.NonStandardHttp3SettingsValidator nonStandardSettingsValidator) {
89          this.validator = checkNotNull(validator, "validator");
90          this.qpackDecoder = checkNotNull(qpackDecoder, "qpackDecoder");
91          this.maxHeaderListSize = checkPositive(maxHeaderListSize, "maxHeaderListSize");
92          this.qpackEncoder = checkNotNull(qpackEncoder, "qpackEncoder");
93          this.encodeState = checkNotNull(encodeState, "encodeState");
94          this.decodeState = checkNotNull(decodeState, "decodeState");
95          this.nonStandardSettingsValidator = nonStandardSettingsValidator;
96      }
97  
98      @Override
99      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
100         qpackAttributes = Http3.getQpackAttributes(ctx.channel().parent());
101         assert qpackAttributes != null;
102 
103         initReadResumptionListenerIfRequired(ctx);
104         super.handlerAdded(ctx);
105     }
106 
107     @Override
108     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
109         if (writeResumptionListener != null) {
110             writeResumptionListener.drain();
111         }
112         super.channelInactive(ctx);
113     }
114 
115     @Override
116     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
117         if (writeResumptionListener != null) {
118             // drain everything so we are sure we never leak anything.
119             writeResumptionListener.drain();
120         }
121         super.handlerRemoved0(ctx);
122     }
123 
124     @Override
125     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
126         ByteBuf buffer;
127         if (msg instanceof QuicStreamFrame) {
128             QuicStreamFrame streamFrame = (QuicStreamFrame) msg;
129             buffer = streamFrame.content().retain();
130             streamFrame.release();
131         } else {
132             buffer = (ByteBuf) msg;
133         }
134         super.channelRead(ctx, buffer);
135     }
136 
137     @Override
138     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
139         assert readResumptionListener != null;
140         if (readResumptionListener.readCompleted()) {
141             super.channelReadComplete(ctx);
142         }
143     }
144 
145     private void connectionError(ChannelHandlerContext ctx, Http3ErrorCode code, String msg, boolean fireException) {
146         error = true;
147         Http3CodecUtils.connectionError(ctx, code, msg, fireException);
148     }
149 
150     private void connectionError(ChannelHandlerContext ctx, Http3Exception exception, boolean fireException) {
151         error = true;
152         Http3CodecUtils.connectionError(ctx, exception, fireException);
153     }
154 
155     @Override
156     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
157         assert readResumptionListener != null;
158         if (!in.isReadable() || readResumptionListener.isSuspended()) {
159             return;
160         }
161         if (error) {
162             in.skipBytes(in.readableBytes());
163             return;
164         }
165         if (type == -1) {
166             int typeLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
167             if (in.readableBytes() < typeLen) {
168                 return;
169             }
170             long localType = readVariableLengthInteger(in, typeLen);
171             if (Http3CodecUtils.isReservedHttp2FrameType(localType)) {
172                 // See https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.8
173                 connectionError(ctx, Http3ErrorCode.H3_FRAME_UNEXPECTED,
174                         "Reserved type for HTTP/2 received.", true);
175                 return;
176             }
177             try {
178                 // Validate if the type is valid for the current stream first.
179                 validator.validate(localType, firstFrame);
180             } catch (Http3Exception e) {
181                 connectionError(ctx, e, true);
182                 return;
183             }
184             type = localType;
185             firstFrame = false;
186             if (!in.isReadable()) {
187                 return;
188             }
189         }
190         if (payLoadLength == -1) {
191             int payloadLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
192             assert payloadLen <= 8;
193             if (in.readableBytes() < payloadLen) {
194                 return;
195             }
196             long len = readVariableLengthInteger(in, payloadLen);
197             if (len > Integer.MAX_VALUE) {
198                 connectionError(ctx, Http3ErrorCode.H3_EXCESSIVE_LOAD,
199                         "Received an invalid frame len.", true);
200                 return;
201             }
202             payLoadLength = (int) len;
203         }
204         int read = decodeFrame(ctx, type, payLoadLength, in, out);
205         if (read >= 0) {
206             if (read == payLoadLength) {
207                 type = -1;
208                 payLoadLength = -1;
209             } else {
210                 payLoadLength -= read;
211             }
212         }
213     }
214 
215     private static int skipBytes(ByteBuf in, int payLoadLength) {
216         in.skipBytes(payLoadLength);
217         return payLoadLength;
218     }
219 
220     private int decodeFrame(ChannelHandlerContext ctx, long longType, int payLoadLength, ByteBuf in, List<Object> out) {
221         if (longType > Integer.MAX_VALUE && !Http3CodecUtils.isReservedFrameType(longType)) {
222             return skipBytes(in, payLoadLength);
223         }
224         int type = (int) longType;
225         // See https://tools.ietf.org/html/draft-ietf-quic-http-32#section-11.2.1
226         switch (type) {
227             case HTTP3_DATA_FRAME_TYPE:
228                 // DATA
229                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.1
230                 int readable = in.readableBytes();
231                 if (readable == 0 && payLoadLength > 0) {
232                     return 0;
233                 }
234                 int length = Math.min(readable, payLoadLength);
235                 out.add(new DefaultHttp3DataFrame(in.readRetainedSlice(length)));
236                 return length;
237             case HTTP3_HEADERS_FRAME_TYPE:
238                 // HEADERS
239                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.2
240                 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
241                         // Let's use the maxHeaderListSize as a limit as this is this is the decompressed amounts of
242                         // bytes which means the once we decompressed the headers we will be bigger then the actual
243                         // payload size now.
244                         maxHeaderListSize, Http3ErrorCode.H3_EXCESSIVE_LOAD)) {
245                     return 0;
246                 }
247                 assert qpackAttributes != null;
248                 if (!qpackAttributes.dynamicTableDisabled() && !qpackAttributes.decoderStreamAvailable()) {
249                     assert readResumptionListener != null;
250                     readResumptionListener.suspended();
251                     return 0;
252                 }
253 
254                 Http3HeadersFrame headersFrame = new DefaultHttp3HeadersFrame();
255                 if (decodeHeaders(ctx, headersFrame.headers(), in, payLoadLength, decodeState.receivedFinalHeaders())) {
256                     out.add(headersFrame);
257                     return payLoadLength;
258                 }
259                 return -1;
260             case HTTP3_CANCEL_PUSH_FRAME_TYPE:
261                 // CANCEL_PUSH
262                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.3
263                 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
264                         HTTP3_CANCEL_PUSH_FRAME_MAX_LEN, Http3ErrorCode.H3_FRAME_ERROR)) {
265                     return 0;
266                 }
267                 int pushIdLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
268                 out.add(new DefaultHttp3CancelPushFrame(readVariableLengthInteger(in, pushIdLen)));
269                 return payLoadLength;
270             case HTTP3_SETTINGS_FRAME_TYPE:
271                 // SETTINGS
272                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.4
273 
274                 // Use 256 as this gives space for 16 maximal size encoder and 128 minimal size encoded settings.
275                 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength, HTTP3_SETTINGS_FRAME_MAX_LEN,
276                         Http3ErrorCode.H3_EXCESSIVE_LOAD)) {
277                     return 0;
278                 }
279                 Http3SettingsFrame settingsFrame = decodeSettings(ctx, in, payLoadLength);
280                 if (settingsFrame != null) {
281                     out.add(settingsFrame);
282                 }
283                 return payLoadLength;
284             case HTTP3_PUSH_PROMISE_FRAME_TYPE:
285                 // PUSH_PROMISE
286                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.5
287                 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
288                         // Let's use the maxHeaderListSize as a limit as this is this is the decompressed amounts of
289                         // bytes which means the once we decompressed the headers we will be bigger then the actual
290                         // payload size now.
291                         Math.max(maxHeaderListSize, maxHeaderListSize + 8), Http3ErrorCode.H3_EXCESSIVE_LOAD)) {
292                     return 0;
293                 }
294 
295                 assert qpackAttributes != null;
296                 if (!qpackAttributes.dynamicTableDisabled() && !qpackAttributes.decoderStreamAvailable()) {
297                     assert readResumptionListener != null;
298                     readResumptionListener.suspended();
299                     return 0;
300                 }
301                 int readerIdx = in.readerIndex();
302                 int pushPromiseIdLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
303                 Http3PushPromiseFrame pushPromiseFrame = new DefaultHttp3PushPromiseFrame(
304                         readVariableLengthInteger(in, pushPromiseIdLen));
305                 if (decodeHeaders(ctx, pushPromiseFrame.headers(), in, payLoadLength - pushPromiseIdLen, false)) {
306                     out.add(pushPromiseFrame);
307                     return payLoadLength;
308                 }
309                 in.readerIndex(readerIdx);
310                 return -1;
311             case HTTP3_GO_AWAY_FRAME_TYPE:
312                 // GO_AWAY
313                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.6
314                 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
315                         HTTP3_GO_AWAY_FRAME_MAX_LEN, Http3ErrorCode.H3_FRAME_ERROR)) {
316                     return 0;
317                 }
318                 int idLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
319                 out.add(new DefaultHttp3GoAwayFrame(readVariableLengthInteger(in, idLen)));
320                 return payLoadLength;
321             case HTTP3_MAX_PUSH_ID_FRAME_TYPE:
322                 // MAX_PUSH_ID
323                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.7
324                 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
325                         HTTP3_MAX_PUSH_ID_FRAME_MAX_LEN, Http3ErrorCode.H3_FRAME_ERROR)) {
326                     return 0;
327                 }
328                 int pidLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
329                 out.add(new DefaultHttp3MaxPushIdFrame(readVariableLengthInteger(in, pidLen)));
330                 return payLoadLength;
331             default:
332                 if (!Http3CodecUtils.isReservedFrameType(longType)) {
333                     return skipBytes(in, payLoadLength);
334                 }
335                 // Handling reserved frame types
336                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.8
337                 if (in.readableBytes() < payLoadLength) {
338                     return 0;
339                 }
340                 out.add(new DefaultHttp3UnknownFrame(longType, in.readRetainedSlice(payLoadLength)));
341                 return payLoadLength;
342         }
343     }
344 
345     private boolean enforceMaxPayloadLength(
346             ChannelHandlerContext ctx, ByteBuf in, int type, int payLoadLength,
347             long maxPayLoadLength, Http3ErrorCode error) {
348         if (payLoadLength > maxPayLoadLength) {
349             connectionError(ctx, error,
350                     "Received an invalid frame len " + payLoadLength + " for frame of type " + type + '.', true);
351             return false;
352         }
353         return in.readableBytes() >= payLoadLength;
354     }
355 
356     @Nullable
357     private Http3SettingsFrame decodeSettings(ChannelHandlerContext ctx, ByteBuf in, int payLoadLength) {
358         Http3SettingsFrame settingsFrame = new DefaultHttp3SettingsFrame(
359                 new Http3Settings(nonStandardSettingsValidator));
360         while (payLoadLength > 0) {
361             int keyLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
362             long key = readVariableLengthInteger(in, keyLen);
363             if (Http3CodecUtils.isReservedHttp2Setting(key)) {
364                 // This must be treated as a connection error
365                 // See https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.4.1
366                 connectionError(ctx, Http3ErrorCode.H3_SETTINGS_ERROR,
367                         "Received a settings key that is reserved for HTTP/2.", true);
368                 return null;
369             }
370             payLoadLength -= keyLen;
371             int valueLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
372             long value = readVariableLengthInteger(in, valueLen);
373             payLoadLength -= valueLen;
374 
375             if (settingsFrame.put(key, value) != null) {
376                 // This must be treated as a connection error
377                 // See https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.4
378                 connectionError(ctx, Http3ErrorCode.H3_SETTINGS_ERROR,
379                         "Received a duplicate settings key.", true);
380                 return null;
381             }
382         }
383         return settingsFrame;
384     }
385 
386     /**
387      * Decode the header block into header fields.
388      *
389      * @param ctx {@link ChannelHandlerContext} for this handler.
390      * @param headers to be populated by decode.
391      * @param in {@link ByteBuf} containing the encode header block. It is assumed that the entire header block is
392      *           contained in this buffer.
393      * @param length Number of bytes in the passed buffer that represent the encoded header block.
394      * @param trailer {@code true} if this is a trailer section.
395      * @return {@code true} if the headers were decoded, {@code false} otherwise. A header block may not be decoded if
396      * it is awaiting QPACK dynamic table updates.
397      */
398     private boolean decodeHeaders(ChannelHandlerContext ctx, Http3Headers headers, ByteBuf in, int length,
399                                   boolean trailer) {
400         try {
401             Http3HeadersSink sink = new Http3HeadersSink(headers, maxHeaderListSize, true, trailer);
402             assert qpackAttributes != null;
403             assert readResumptionListener != null;
404             if (qpackDecoder.decode(qpackAttributes,
405                     ((QuicStreamChannel) ctx.channel()).streamId(), in, length, sink, readResumptionListener)) {
406                 // Throws exception if detected any problem so far
407                 sink.finish();
408                 return true;
409             }
410             readResumptionListener.suspended();
411         } catch (Http3Exception e) {
412             connectionError(ctx, e.errorCode(), e.getMessage(), true);
413         } catch (QpackException e) {
414             // Must be treated as a connection error.
415             connectionError(ctx, Http3ErrorCode.QPACK_DECOMPRESSION_FAILED,
416                     "Decompression of header block failed.", true);
417         } catch (Http3HeadersValidationException e) {
418             error = true;
419             ctx.fireExceptionCaught(e);
420             // We should shutdown the stream with an error.
421             // See https://tools.ietf.org/html/draft-ietf-quic-http-32#section-4.1.3
422             Http3CodecUtils.streamError(ctx, Http3ErrorCode.H3_MESSAGE_ERROR);
423         }
424         return false;
425     }
426 
427     @Override
428     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
429         assert qpackAttributes != null;
430         if (writeResumptionListener != null) {
431             writeResumptionListener.enqueue(msg, promise);
432             return;
433         }
434 
435         if ((msg instanceof Http3HeadersFrame || msg instanceof Http3PushPromiseFrame) &&
436                 !qpackAttributes.dynamicTableDisabled() && !qpackAttributes.encoderStreamAvailable()) {
437             writeResumptionListener = WriteResumptionListener.newListener(ctx, this);
438             writeResumptionListener.enqueue(msg, promise);
439             return;
440         }
441 
442         write0(ctx, msg, promise);
443     }
444 
445     private void write0(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
446         try {
447             if (msg instanceof Http3DataFrame) {
448                 writeDataFrame(ctx, (Http3DataFrame) msg, promise);
449             } else if (msg instanceof Http3HeadersFrame) {
450                 writeHeadersFrame(ctx, (Http3HeadersFrame) msg, promise);
451             } else if (msg instanceof Http3CancelPushFrame) {
452                 writeCancelPushFrame(ctx, (Http3CancelPushFrame) msg, promise);
453             } else if (msg instanceof Http3SettingsFrame) {
454                 writeSettingsFrame(ctx, (Http3SettingsFrame) msg, promise);
455             } else if (msg instanceof Http3PushPromiseFrame) {
456                 writePushPromiseFrame(ctx, (Http3PushPromiseFrame) msg, promise);
457             } else if (msg instanceof Http3GoAwayFrame) {
458                 writeGoAwayFrame(ctx, (Http3GoAwayFrame) msg, promise);
459             } else if (msg instanceof Http3MaxPushIdFrame) {
460                 writeMaxPushIdFrame(ctx, (Http3MaxPushIdFrame) msg, promise);
461             } else if (msg instanceof Http3UnknownFrame) {
462                 writeUnknownFrame(ctx, (Http3UnknownFrame) msg, promise);
463             } else {
464                 unsupported(promise);
465             }
466         } finally {
467             ReferenceCountUtil.release(msg);
468         }
469     }
470 
471     private static void writeDataFrame(
472             ChannelHandlerContext ctx, Http3DataFrame frame, ChannelPromise promise) {
473         ByteBuf out = ctx.alloc().directBuffer(16);
474         writeVariableLengthInteger(out, frame.type());
475         writeVariableLengthInteger(out, frame.content().readableBytes());
476         ByteBuf content = frame.content().retain();
477         ctx.write(Unpooled.wrappedUnmodifiableBuffer(out, content), promise);
478     }
479 
480     private void writeHeadersFrame(ChannelHandlerContext ctx, Http3HeadersFrame frame, ChannelPromise promise) {
481         assert qpackAttributes != null;
482         final QuicStreamChannel channel = (QuicStreamChannel) ctx.channel();
483         writeDynamicFrame(ctx, frame.type(), frame, (f, out) -> {
484             qpackEncoder.encodeHeaders(qpackAttributes, out, ctx.alloc(), channel.streamId(), f.headers());
485             return true;
486         }, promise);
487     }
488 
489     private static void writeCancelPushFrame(
490             ChannelHandlerContext ctx, Http3CancelPushFrame frame, ChannelPromise promise) {
491         writeFrameWithId(ctx, frame.type(), frame.id(), promise);
492     }
493 
494     private static void writeSettingsFrame(
495             ChannelHandlerContext ctx, Http3SettingsFrame frame, ChannelPromise promise) {
496         writeDynamicFrame(ctx, frame.type(), frame, (f, out) -> {
497             for (Map.Entry<Long, Long> e : f) {
498                 Long key = e.getKey();
499                 if (Http3CodecUtils.isReservedHttp2Setting(key)) {
500                     Http3Exception exception = new Http3Exception(Http3ErrorCode.H3_SETTINGS_ERROR,
501                             "Received a settings key that is reserved for HTTP/2.");
502                     promise.setFailure(exception);
503                     // See https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.8
504                     Http3CodecUtils.connectionError(ctx, exception, false);
505                     return false;
506                 }
507                 Long value = e.getValue();
508                 int keyLen = numBytesForVariableLengthInteger(key);
509                 int valueLen = numBytesForVariableLengthInteger(value);
510                 writeVariableLengthInteger(out, key, keyLen);
511                 writeVariableLengthInteger(out, value, valueLen);
512             }
513             return true;
514         }, promise);
515     }
516 
517     private static <T extends Http3Frame> void writeDynamicFrame(ChannelHandlerContext ctx, long type, T frame,
518                                                                  BiFunction<T, ByteBuf, Boolean> writer,
519                                                                  ChannelPromise promise) {
520         ByteBuf out = ctx.alloc().directBuffer();
521         int initialWriterIndex = out.writerIndex();
522         // Move 16 bytes forward as this is the maximum amount we could ever need for the type + payload length.
523         int payloadStartIndex = initialWriterIndex + 16;
524         out.writerIndex(payloadStartIndex);
525 
526         if (writer.apply(frame, out)) {
527             int finalWriterIndex = out.writerIndex();
528             int payloadLength = finalWriterIndex - payloadStartIndex;
529             int len = numBytesForVariableLengthInteger(payloadLength);
530             out.writerIndex(payloadStartIndex - len);
531             writeVariableLengthInteger(out, payloadLength, len);
532 
533             int typeLength = numBytesForVariableLengthInteger(type);
534             int startIndex = payloadStartIndex - len - typeLength;
535             out.writerIndex(startIndex);
536             writeVariableLengthInteger(out, type, typeLength);
537 
538             out.setIndex(startIndex, finalWriterIndex);
539             ctx.write(out, promise);
540         } else {
541             // We failed to encode, lets release the buffer so we dont leak.
542             out.release();
543         }
544     }
545 
546     private void writePushPromiseFrame(ChannelHandlerContext ctx, Http3PushPromiseFrame frame, ChannelPromise promise) {
547         assert qpackAttributes != null;
548         final QuicStreamChannel channel = (QuicStreamChannel) ctx.channel();
549         writeDynamicFrame(ctx, frame.type(), frame, (f, out) -> {
550             long id = f.id();
551             writeVariableLengthInteger(out, id);
552             qpackEncoder.encodeHeaders(qpackAttributes, out, ctx.alloc(), channel.streamId(), f.headers());
553             return true;
554         }, promise);
555     }
556 
557     private static void writeGoAwayFrame(
558             ChannelHandlerContext ctx, Http3GoAwayFrame frame, ChannelPromise promise) {
559         writeFrameWithId(ctx, frame.type(), frame.id(), promise);
560     }
561 
562     private static void writeMaxPushIdFrame(
563             ChannelHandlerContext ctx, Http3MaxPushIdFrame frame, ChannelPromise promise) {
564         writeFrameWithId(ctx, frame.type(), frame.id(), promise);
565     }
566 
567     private static void writeFrameWithId(ChannelHandlerContext ctx, long type, long id, ChannelPromise promise) {
568         ByteBuf out = ctx.alloc().directBuffer(24);
569         writeVariableLengthInteger(out, type);
570         writeVariableLengthInteger(out, numBytesForVariableLengthInteger(id));
571         writeVariableLengthInteger(out, id);
572         ctx.write(out, promise);
573     }
574 
575     private void writeUnknownFrame(
576             ChannelHandlerContext ctx, Http3UnknownFrame frame, ChannelPromise promise) {
577         long type = frame.type();
578         if (Http3CodecUtils.isReservedHttp2FrameType(type)) {
579             Http3Exception exception = new Http3Exception(Http3ErrorCode.H3_FRAME_UNEXPECTED,
580                     "Reserved type for HTTP/2 send.");
581             promise.setFailure(exception);
582             // See https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.8
583             connectionError(ctx, exception.errorCode(), exception.getMessage(), false);
584             return;
585         }
586         if (!Http3CodecUtils.isReservedFrameType(type)) {
587             Http3Exception exception = new Http3Exception(Http3ErrorCode.H3_FRAME_UNEXPECTED,
588                     "Non reserved type for HTTP/3 send.");
589             promise.setFailure(exception);
590             return;
591         }
592         ByteBuf out = ctx.alloc().directBuffer();
593         writeVariableLengthInteger(out, type);
594         writeVariableLengthInteger(out, frame.content().readableBytes());
595         ByteBuf content = frame.content().retain();
596         ctx.write(Unpooled.wrappedUnmodifiableBuffer(out, content), promise);
597     }
598 
599     private void initReadResumptionListenerIfRequired(ChannelHandlerContext ctx) {
600         if (readResumptionListener == null) {
601             readResumptionListener = new ReadResumptionListener(ctx, this);
602         }
603     }
604 
605     private static void unsupported(ChannelPromise promise) {
606         promise.setFailure(new UnsupportedOperationException());
607     }
608 
609     @Override
610     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
611         ctx.bind(localAddress, promise);
612     }
613 
614     @Override
615     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
616                         SocketAddress localAddress, ChannelPromise promise) {
617         ctx.connect(remoteAddress, localAddress, promise);
618     }
619 
620     @Override
621     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
622         ctx.disconnect(promise);
623     }
624 
625     @Override
626     public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
627         ctx.close(promise);
628     }
629 
630     @Override
631     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
632         ctx.deregister(promise);
633     }
634 
635     @Override
636     public void read(ChannelHandlerContext ctx) {
637         assert readResumptionListener != null;
638         if (readResumptionListener.readRequested()) {
639             ctx.read();
640         }
641     }
642 
643     @Override
644     public void flush(ChannelHandlerContext ctx) {
645         if (writeResumptionListener != null) {
646             writeResumptionListener.enqueueFlush();
647         } else {
648             ctx.flush();
649         }
650     }
651 
652     private static final class ReadResumptionListener
653             implements Runnable, GenericFutureListener<Future<? super QuicStreamChannel>> {
654         private static final int STATE_SUSPENDED = 0b1000_0000;
655         private static final int STATE_READ_PENDING = 0b0100_0000;
656         private static final int STATE_READ_COMPLETE_PENDING = 0b0010_0000;
657 
658         private final ChannelHandlerContext ctx;
659         private final Http3FrameCodec codec;
660         private byte state;
661 
662         ReadResumptionListener(ChannelHandlerContext ctx, Http3FrameCodec codec) {
663             this.ctx = ctx;
664             this.codec = codec;
665             assert codec.qpackAttributes != null;
666             if (!codec.qpackAttributes.dynamicTableDisabled() && !codec.qpackAttributes.decoderStreamAvailable()) {
667                 codec.qpackAttributes.whenDecoderStreamAvailable(this);
668             }
669         }
670 
671         void suspended() {
672             assert !codec.qpackAttributes.dynamicTableDisabled();
673             setState(STATE_SUSPENDED);
674         }
675 
676         boolean readCompleted() {
677             if (hasState(STATE_SUSPENDED)) {
678                 setState(STATE_READ_COMPLETE_PENDING);
679                 return false;
680             }
681             return true;
682         }
683 
684         boolean readRequested() {
685             if (hasState(STATE_SUSPENDED)) {
686                 setState(STATE_READ_PENDING);
687                 return false;
688             }
689             return true;
690         }
691 
692         boolean isSuspended() {
693             return hasState(STATE_SUSPENDED);
694         }
695 
696         @Override
697         public void operationComplete(Future<? super QuicStreamChannel> future) {
698             if (future.isSuccess()) {
699                 resume();
700             } else {
701                 ctx.fireExceptionCaught(future.cause());
702             }
703         }
704 
705         @Override
706         public void run() {
707             resume();
708         }
709 
710         private void resume() {
711             unsetState(STATE_SUSPENDED);
712             try {
713                 codec.channelRead(ctx, Unpooled.EMPTY_BUFFER);
714                 if (hasState(STATE_READ_COMPLETE_PENDING)) {
715                     unsetState(STATE_READ_COMPLETE_PENDING);
716                     codec.channelReadComplete(ctx);
717                 }
718                 if (hasState(STATE_READ_PENDING)) {
719                     unsetState(STATE_READ_PENDING);
720                     codec.read(ctx);
721                 }
722             } catch (Exception e) {
723                 ctx.fireExceptionCaught(e);
724             }
725         }
726 
727         private void setState(int toSet) {
728             state |= toSet;
729         }
730 
731         private boolean hasState(int toCheck) {
732             return (state & toCheck) == toCheck;
733         }
734 
735         private void unsetState(int toUnset) {
736             state &= ~toUnset;
737         }
738     }
739 
740     private static final class WriteResumptionListener
741             implements GenericFutureListener<Future<? super QuicStreamChannel>> {
742         private static final Object FLUSH = new Object();
743         private final PendingWriteQueue queue;
744         private final ChannelHandlerContext ctx;
745         private final Http3FrameCodec codec;
746 
747         private WriteResumptionListener(ChannelHandlerContext ctx, Http3FrameCodec codec) {
748             this.ctx = ctx;
749             this.codec = codec;
750             queue = new PendingWriteQueue(ctx);
751         }
752 
753         @Override
754         public void operationComplete(Future<? super QuicStreamChannel> future) {
755             drain();
756         }
757 
758         void enqueue(Object msg, ChannelPromise promise) {
759             assert ctx.channel().eventLoop().inEventLoop();
760             // Touch the message to allow easier debugging of memory leaks
761             ReferenceCountUtil.touch(msg);
762             queue.add(msg, promise);
763         }
764 
765         void enqueueFlush() {
766             assert ctx.channel().eventLoop().inEventLoop();
767             queue.add(FLUSH, ctx.voidPromise());
768         }
769 
770         void drain() {
771             assert ctx.channel().eventLoop().inEventLoop();
772             boolean flushSeen = false;
773             try {
774                 for (;;) {
775                     Object entry = queue.current();
776                     if (entry == null) {
777                         break;
778                     }
779                     if (entry == FLUSH) {
780                         flushSeen = true;
781                         queue.remove().trySuccess();
782                     } else {
783                         // Retain the entry as remove() will call release() as well.
784                         codec.write0(ctx, ReferenceCountUtil.retain(entry), queue.remove());
785                     }
786                 }
787                 // indicate that writes do not need to be enqueued. As we are on the eventloop, no other writes can
788                 // happen while we are draining, hence we would not write out of order.
789                 codec.writeResumptionListener = null;
790             } finally {
791                 if (flushSeen) {
792                     codec.flush(ctx);
793                 }
794             }
795         }
796 
797         static WriteResumptionListener newListener(ChannelHandlerContext ctx, Http3FrameCodec codec) {
798             WriteResumptionListener listener = new WriteResumptionListener(ctx, codec);
799             assert codec.qpackAttributes != null;
800             codec.qpackAttributes.whenEncoderStreamAvailable(listener);
801             return listener;
802         }
803     }
804 
805     /**
806      * A factory for creating codec for HTTP3 frames.
807      */
808     @FunctionalInterface
809     interface Http3FrameCodecFactory {
810         /**
811          * Creates a new codec instance for the passed {@code streamType}.
812          *
813          * @param validator for the frames.
814          * @param encodeState for the request stream.
815          * @param decodeState for the request stream.
816          * @return new codec instance for the passed {@code streamType}.
817          */
818         ChannelHandler newCodec(Http3FrameTypeValidator validator, Http3RequestStreamCodecState encodeState,
819                                 Http3RequestStreamCodecState decodeState,
820                                 Http3Settings.NonStandardHttp3SettingsValidator nonStandardSettingsValidator);
821     }
822 }