View Javadoc
1   /*
2    * Copyright 2020 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http3;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelHandler;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelOutboundHandler;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.PendingWriteQueue;
25  import io.netty.handler.codec.ByteToMessageDecoder;
26  import io.netty.handler.codec.quic.QuicStreamChannel;
27  import io.netty.handler.codec.quic.QuicStreamFrame;
28  import io.netty.util.ReferenceCountUtil;
29  import io.netty.util.concurrent.Future;
30  import io.netty.util.concurrent.GenericFutureListener;
31  import org.jetbrains.annotations.Nullable;
32  
33  import java.net.SocketAddress;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.function.BiFunction;
37  
38  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_CANCEL_PUSH_FRAME_MAX_LEN;
39  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_CANCEL_PUSH_FRAME_TYPE;
40  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_DATA_FRAME_TYPE;
41  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_GO_AWAY_FRAME_MAX_LEN;
42  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_GO_AWAY_FRAME_TYPE;
43  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_HEADERS_FRAME_TYPE;
44  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_MAX_PUSH_ID_FRAME_MAX_LEN;
45  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_MAX_PUSH_ID_FRAME_TYPE;
46  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_PUSH_PROMISE_FRAME_TYPE;
47  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_SETTINGS_FRAME_MAX_LEN;
48  import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_SETTINGS_FRAME_TYPE;
49  import static io.netty.handler.codec.http3.Http3CodecUtils.numBytesForVariableLengthInteger;
50  import static io.netty.handler.codec.http3.Http3CodecUtils.readVariableLengthInteger;
51  import static io.netty.handler.codec.http3.Http3CodecUtils.writeVariableLengthInteger;
52  import static io.netty.util.internal.ObjectUtil.checkNotNull;
53  import static io.netty.util.internal.ObjectUtil.checkPositive;
54  
55  /**
56   * Decodes / encodes {@link Http3Frame}s.
57   */
58  final class Http3FrameCodec extends ByteToMessageDecoder implements ChannelOutboundHandler {
59      private final Http3FrameTypeValidator validator;
60      private final long maxHeaderListSize;
61      private final QpackDecoder qpackDecoder;
62      private final QpackEncoder qpackEncoder;
63      private final Http3RequestStreamCodecState encodeState;
64      private final Http3RequestStreamCodecState decodeState;
65  
66      private boolean firstFrame = true;
67      private boolean error;
68      private long type = -1;
69      private int payLoadLength = -1;
70      private QpackAttributes qpackAttributes;
71      private ReadResumptionListener readResumptionListener;
72      private WriteResumptionListener writeResumptionListener;
73  
74      static Http3FrameCodecFactory newFactory(QpackDecoder qpackDecoder,
75                                               long maxHeaderListSize, QpackEncoder qpackEncoder) {
76          checkNotNull(qpackEncoder, "qpackEncoder");
77          checkNotNull(qpackDecoder, "qpackDecoder");
78  
79          // QPACK decoder and encoder are shared between streams in a connection.
80          return (validator, encodeState, decodeState) -> new Http3FrameCodec(validator, qpackDecoder,
81                  maxHeaderListSize, qpackEncoder, encodeState, decodeState);
82      }
83  
84      Http3FrameCodec(Http3FrameTypeValidator validator, QpackDecoder qpackDecoder,
85                      long maxHeaderListSize, QpackEncoder qpackEncoder, Http3RequestStreamCodecState encodeState,
86                      Http3RequestStreamCodecState decodeState) {
87          this.validator = checkNotNull(validator, "validator");
88          this.qpackDecoder = checkNotNull(qpackDecoder, "qpackDecoder");
89          this.maxHeaderListSize = checkPositive(maxHeaderListSize, "maxHeaderListSize");
90          this.qpackEncoder = checkNotNull(qpackEncoder, "qpackEncoder");
91          this.encodeState = checkNotNull(encodeState, "encodeState");
92          this.decodeState = checkNotNull(decodeState, "decodeState");
93      }
94  
95      @Override
96      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
97          qpackAttributes = Http3.getQpackAttributes(ctx.channel().parent());
98          assert qpackAttributes != null;
99  
100         initReadResumptionListenerIfRequired(ctx);
101         super.handlerAdded(ctx);
102     }
103 
104     @Override
105     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
106         if (writeResumptionListener != null) {
107             writeResumptionListener.drain();
108         }
109         super.channelInactive(ctx);
110     }
111 
112     @Override
113     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
114         if (writeResumptionListener != null) {
115             // drain everything so we are sure we never leak anything.
116             writeResumptionListener.drain();
117         }
118         super.handlerRemoved0(ctx);
119     }
120 
121     @Override
122     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
123         ByteBuf buffer;
124         if (msg instanceof QuicStreamFrame) {
125             QuicStreamFrame streamFrame = (QuicStreamFrame) msg;
126             buffer = streamFrame.content().retain();
127             streamFrame.release();
128         } else {
129             buffer = (ByteBuf) msg;
130         }
131         super.channelRead(ctx, buffer);
132     }
133 
134     @Override
135     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
136         assert readResumptionListener != null;
137         if (readResumptionListener.readCompleted()) {
138             super.channelReadComplete(ctx);
139         }
140     }
141 
142     private void connectionError(ChannelHandlerContext ctx, Http3ErrorCode code, String msg, boolean fireException) {
143         error = true;
144         Http3CodecUtils.connectionError(ctx, code, msg, fireException);
145     }
146 
147     private void connectionError(ChannelHandlerContext ctx, Http3Exception exception, boolean fireException) {
148         error = true;
149         Http3CodecUtils.connectionError(ctx, exception, fireException);
150     }
151 
152     @Override
153     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
154         assert readResumptionListener != null;
155         if (!in.isReadable() || readResumptionListener.isSuspended()) {
156             return;
157         }
158         if (error) {
159             in.skipBytes(in.readableBytes());
160             return;
161         }
162         if (type == -1) {
163             int typeLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
164             if (in.readableBytes() < typeLen) {
165                 return;
166             }
167             long localType = readVariableLengthInteger(in, typeLen);
168             if (Http3CodecUtils.isReservedHttp2FrameType(localType)) {
169                 // See https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.8
170                 connectionError(ctx, Http3ErrorCode.H3_FRAME_UNEXPECTED,
171                         "Reserved type for HTTP/2 received.", true);
172                 return;
173             }
174             try {
175                 // Validate if the type is valid for the current stream first.
176                 validator.validate(localType, firstFrame);
177             } catch (Http3Exception e) {
178                 connectionError(ctx, e, true);
179                 return;
180             }
181             type = localType;
182             firstFrame = false;
183             if (!in.isReadable()) {
184                 return;
185             }
186         }
187         if (payLoadLength == -1) {
188             int payloadLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
189             assert payloadLen <= 8;
190             if (in.readableBytes() < payloadLen) {
191                 return;
192             }
193             long len = readVariableLengthInteger(in, payloadLen);
194             if (len > Integer.MAX_VALUE) {
195                 connectionError(ctx, Http3ErrorCode.H3_EXCESSIVE_LOAD,
196                         "Received an invalid frame len.", true);
197                 return;
198             }
199             payLoadLength = (int) len;
200         }
201         int read = decodeFrame(ctx, type, payLoadLength, in, out);
202         if (read >= 0) {
203             if (read == payLoadLength) {
204                 type = -1;
205                 payLoadLength = -1;
206             } else {
207                 payLoadLength -= read;
208             }
209         }
210     }
211 
212     private static int skipBytes(ByteBuf in, int payLoadLength) {
213         in.skipBytes(payLoadLength);
214         return payLoadLength;
215     }
216 
217     private int decodeFrame(ChannelHandlerContext ctx, long longType, int payLoadLength, ByteBuf in, List<Object> out) {
218         if (longType > Integer.MAX_VALUE && !Http3CodecUtils.isReservedFrameType(longType)) {
219             return skipBytes(in, payLoadLength);
220         }
221         int type = (int) longType;
222         // See https://tools.ietf.org/html/draft-ietf-quic-http-32#section-11.2.1
223         switch (type) {
224             case HTTP3_DATA_FRAME_TYPE:
225                 // DATA
226                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.1
227                 int readable = in.readableBytes();
228                 if (readable == 0 && payLoadLength > 0) {
229                     return 0;
230                 }
231                 int length = Math.min(readable, payLoadLength);
232                 out.add(new DefaultHttp3DataFrame(in.readRetainedSlice(length)));
233                 return length;
234             case HTTP3_HEADERS_FRAME_TYPE:
235                 // HEADERS
236                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.2
237                 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
238                         // Let's use the maxHeaderListSize as a limit as this is this is the decompressed amounts of
239                         // bytes which means the once we decompressed the headers we will be bigger then the actual
240                         // payload size now.
241                         maxHeaderListSize, Http3ErrorCode.H3_EXCESSIVE_LOAD)) {
242                     return 0;
243                 }
244                 assert qpackAttributes != null;
245                 if (!qpackAttributes.dynamicTableDisabled() && !qpackAttributes.decoderStreamAvailable()) {
246                     assert readResumptionListener != null;
247                     readResumptionListener.suspended();
248                     return 0;
249                 }
250 
251                 Http3HeadersFrame headersFrame = new DefaultHttp3HeadersFrame();
252                 if (decodeHeaders(ctx, headersFrame.headers(), in, payLoadLength, decodeState.receivedFinalHeaders())) {
253                     out.add(headersFrame);
254                     return payLoadLength;
255                 }
256                 return -1;
257             case HTTP3_CANCEL_PUSH_FRAME_TYPE:
258                 // CANCEL_PUSH
259                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.3
260                 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
261                         HTTP3_CANCEL_PUSH_FRAME_MAX_LEN, Http3ErrorCode.H3_FRAME_ERROR)) {
262                     return 0;
263                 }
264                 int pushIdLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
265                 out.add(new DefaultHttp3CancelPushFrame(readVariableLengthInteger(in, pushIdLen)));
266                 return payLoadLength;
267             case HTTP3_SETTINGS_FRAME_TYPE:
268                 // SETTINGS
269                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.4
270 
271                 // Use 256 as this gives space for 16 maximal size encoder and 128 minimal size encoded settings.
272                 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength, HTTP3_SETTINGS_FRAME_MAX_LEN,
273                         Http3ErrorCode.H3_EXCESSIVE_LOAD)) {
274                     return 0;
275                 }
276                 Http3SettingsFrame settingsFrame = decodeSettings(ctx, in, payLoadLength);
277                 if (settingsFrame != null) {
278                     out.add(settingsFrame);
279                 }
280                 return payLoadLength;
281             case HTTP3_PUSH_PROMISE_FRAME_TYPE:
282                 // PUSH_PROMISE
283                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.5
284                 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
285                         // Let's use the maxHeaderListSize as a limit as this is this is the decompressed amounts of
286                         // bytes which means the once we decompressed the headers we will be bigger then the actual
287                         // payload size now.
288                         Math.max(maxHeaderListSize, maxHeaderListSize + 8), Http3ErrorCode.H3_EXCESSIVE_LOAD)) {
289                     return 0;
290                 }
291 
292                 assert qpackAttributes != null;
293                 if (!qpackAttributes.dynamicTableDisabled() && !qpackAttributes.decoderStreamAvailable()) {
294                     assert readResumptionListener != null;
295                     readResumptionListener.suspended();
296                     return 0;
297                 }
298                 int readerIdx = in.readerIndex();
299                 int pushPromiseIdLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
300                 Http3PushPromiseFrame pushPromiseFrame = new DefaultHttp3PushPromiseFrame(
301                         readVariableLengthInteger(in, pushPromiseIdLen));
302                 if (decodeHeaders(ctx, pushPromiseFrame.headers(), in, payLoadLength - pushPromiseIdLen, false)) {
303                     out.add(pushPromiseFrame);
304                     return payLoadLength;
305                 }
306                 in.readerIndex(readerIdx);
307                 return -1;
308             case HTTP3_GO_AWAY_FRAME_TYPE:
309                 // GO_AWAY
310                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.6
311                 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
312                         HTTP3_GO_AWAY_FRAME_MAX_LEN, Http3ErrorCode.H3_FRAME_ERROR)) {
313                     return 0;
314                 }
315                 int idLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
316                 out.add(new DefaultHttp3GoAwayFrame(readVariableLengthInteger(in, idLen)));
317                 return payLoadLength;
318             case HTTP3_MAX_PUSH_ID_FRAME_TYPE:
319                 // MAX_PUSH_ID
320                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.7
321                 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
322                         HTTP3_MAX_PUSH_ID_FRAME_MAX_LEN, Http3ErrorCode.H3_FRAME_ERROR)) {
323                     return 0;
324                 }
325                 int pidLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
326                 out.add(new DefaultHttp3MaxPushIdFrame(readVariableLengthInteger(in, pidLen)));
327                 return payLoadLength;
328             default:
329                 if (!Http3CodecUtils.isReservedFrameType(longType)) {
330                     return skipBytes(in, payLoadLength);
331                 }
332                 // Handling reserved frame types
333                 // https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.8
334                 if (in.readableBytes() < payLoadLength) {
335                     return 0;
336                 }
337                 out.add(new DefaultHttp3UnknownFrame(longType, in.readRetainedSlice(payLoadLength)));
338                 return payLoadLength;
339         }
340     }
341 
342     private boolean enforceMaxPayloadLength(
343             ChannelHandlerContext ctx, ByteBuf in, int type, int payLoadLength,
344             long maxPayLoadLength, Http3ErrorCode error) {
345         if (payLoadLength > maxPayLoadLength) {
346             connectionError(ctx, error,
347                     "Received an invalid frame len " + payLoadLength + " for frame of type " + type + '.', true);
348             return false;
349         }
350         return in.readableBytes() >= payLoadLength;
351     }
352 
353     @Nullable
354     private Http3SettingsFrame decodeSettings(ChannelHandlerContext ctx, ByteBuf in, int payLoadLength) {
355         Http3SettingsFrame settingsFrame = new DefaultHttp3SettingsFrame();
356         while (payLoadLength > 0) {
357             int keyLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
358             long key = readVariableLengthInteger(in, keyLen);
359             if (Http3CodecUtils.isReservedHttp2Setting(key)) {
360                 // This must be treated as a connection error
361                 // See https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.4.1
362                 connectionError(ctx, Http3ErrorCode.H3_SETTINGS_ERROR,
363                         "Received a settings key that is reserved for HTTP/2.", true);
364                 return null;
365             }
366             payLoadLength -= keyLen;
367             int valueLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
368             long value = readVariableLengthInteger(in, valueLen);
369             payLoadLength -= valueLen;
370 
371             if (settingsFrame.put(key, value) != null) {
372                 // This must be treated as a connection error
373                 // See https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.4
374                 connectionError(ctx, Http3ErrorCode.H3_SETTINGS_ERROR,
375                         "Received a duplicate settings key.", true);
376                 return null;
377             }
378         }
379         return settingsFrame;
380     }
381 
382     /**
383      * Decode the header block into header fields.
384      *
385      * @param ctx {@link ChannelHandlerContext} for this handler.
386      * @param headers to be populated by decode.
387      * @param in {@link ByteBuf} containing the encode header block. It is assumed that the entire header block is
388      *           contained in this buffer.
389      * @param length Number of bytes in the passed buffer that represent the encoded header block.
390      * @param trailer {@code true} if this is a trailer section.
391      * @return {@code true} if the headers were decoded, {@code false} otherwise. A header block may not be decoded if
392      * it is awaiting QPACK dynamic table updates.
393      */
394     private boolean decodeHeaders(ChannelHandlerContext ctx, Http3Headers headers, ByteBuf in, int length,
395                                   boolean trailer) {
396         try {
397             Http3HeadersSink sink = new Http3HeadersSink(headers, maxHeaderListSize, true, trailer);
398             assert qpackAttributes != null;
399             assert readResumptionListener != null;
400             if (qpackDecoder.decode(qpackAttributes,
401                     ((QuicStreamChannel) ctx.channel()).streamId(), in, length, sink, readResumptionListener)) {
402                 // Throws exception if detected any problem so far
403                 sink.finish();
404                 return true;
405             }
406             readResumptionListener.suspended();
407         } catch (Http3Exception e) {
408             connectionError(ctx, e.errorCode(), e.getMessage(), true);
409         } catch (QpackException e) {
410             // Must be treated as a connection error.
411             connectionError(ctx, Http3ErrorCode.QPACK_DECOMPRESSION_FAILED,
412                     "Decompression of header block failed.", true);
413         } catch (Http3HeadersValidationException e) {
414             error = true;
415             ctx.fireExceptionCaught(e);
416             // We should shutdown the stream with an error.
417             // See https://tools.ietf.org/html/draft-ietf-quic-http-32#section-4.1.3
418             Http3CodecUtils.streamError(ctx, Http3ErrorCode.H3_MESSAGE_ERROR);
419         }
420         return false;
421     }
422 
423     @Override
424     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
425         assert qpackAttributes != null;
426         if (writeResumptionListener != null) {
427             writeResumptionListener.enqueue(msg, promise);
428             return;
429         }
430 
431         if ((msg instanceof Http3HeadersFrame || msg instanceof Http3PushPromiseFrame) &&
432                 !qpackAttributes.dynamicTableDisabled() && !qpackAttributes.encoderStreamAvailable()) {
433             writeResumptionListener = WriteResumptionListener.newListener(ctx, this);
434             writeResumptionListener.enqueue(msg, promise);
435             return;
436         }
437 
438         write0(ctx, msg, promise);
439     }
440 
441     private void write0(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
442         try {
443             if (msg instanceof Http3DataFrame) {
444                 writeDataFrame(ctx, (Http3DataFrame) msg, promise);
445             } else if (msg instanceof Http3HeadersFrame) {
446                 writeHeadersFrame(ctx, (Http3HeadersFrame) msg, promise);
447             } else if (msg instanceof Http3CancelPushFrame) {
448                 writeCancelPushFrame(ctx, (Http3CancelPushFrame) msg, promise);
449             } else if (msg instanceof Http3SettingsFrame) {
450                 writeSettingsFrame(ctx, (Http3SettingsFrame) msg, promise);
451             } else if (msg instanceof Http3PushPromiseFrame) {
452                 writePushPromiseFrame(ctx, (Http3PushPromiseFrame) msg, promise);
453             } else if (msg instanceof Http3GoAwayFrame) {
454                 writeGoAwayFrame(ctx, (Http3GoAwayFrame) msg, promise);
455             } else if (msg instanceof Http3MaxPushIdFrame) {
456                 writeMaxPushIdFrame(ctx, (Http3MaxPushIdFrame) msg, promise);
457             } else if (msg instanceof Http3UnknownFrame) {
458                 writeUnknownFrame(ctx, (Http3UnknownFrame) msg, promise);
459             } else {
460                 unsupported(promise);
461             }
462         } finally {
463             ReferenceCountUtil.release(msg);
464         }
465     }
466 
467     private static void writeDataFrame(
468             ChannelHandlerContext ctx, Http3DataFrame frame, ChannelPromise promise) {
469         ByteBuf out = ctx.alloc().directBuffer(16);
470         writeVariableLengthInteger(out, frame.type());
471         writeVariableLengthInteger(out, frame.content().readableBytes());
472         ByteBuf content = frame.content().retain();
473         ctx.write(Unpooled.wrappedUnmodifiableBuffer(out, content), promise);
474     }
475 
476     private void writeHeadersFrame(ChannelHandlerContext ctx, Http3HeadersFrame frame, ChannelPromise promise) {
477         assert qpackAttributes != null;
478         final QuicStreamChannel channel = (QuicStreamChannel) ctx.channel();
479         writeDynamicFrame(ctx, frame.type(), frame, (f, out) -> {
480             qpackEncoder.encodeHeaders(qpackAttributes, out, ctx.alloc(), channel.streamId(), f.headers());
481             return true;
482         }, promise);
483     }
484 
485     private static void writeCancelPushFrame(
486             ChannelHandlerContext ctx, Http3CancelPushFrame frame, ChannelPromise promise) {
487         writeFrameWithId(ctx, frame.type(), frame.id(), promise);
488     }
489 
490     private static void writeSettingsFrame(
491             ChannelHandlerContext ctx, Http3SettingsFrame frame, ChannelPromise promise) {
492         writeDynamicFrame(ctx, frame.type(), frame, (f, out) -> {
493             for (Map.Entry<Long, Long> e : f) {
494                 Long key = e.getKey();
495                 if (Http3CodecUtils.isReservedHttp2Setting(key)) {
496                     Http3Exception exception = new Http3Exception(Http3ErrorCode.H3_SETTINGS_ERROR,
497                             "Received a settings key that is reserved for HTTP/2.");
498                     promise.setFailure(exception);
499                     // See https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.8
500                     Http3CodecUtils.connectionError(ctx, exception, false);
501                     return false;
502                 }
503                 Long value = e.getValue();
504                 int keyLen = numBytesForVariableLengthInteger(key);
505                 int valueLen = numBytesForVariableLengthInteger(value);
506                 writeVariableLengthInteger(out, key, keyLen);
507                 writeVariableLengthInteger(out, value, valueLen);
508             }
509             return true;
510         }, promise);
511     }
512 
513     private static <T extends Http3Frame> void writeDynamicFrame(ChannelHandlerContext ctx, long type, T frame,
514                                                                  BiFunction<T, ByteBuf, Boolean> writer,
515                                                                  ChannelPromise promise) {
516         ByteBuf out = ctx.alloc().directBuffer();
517         int initialWriterIndex = out.writerIndex();
518         // Move 16 bytes forward as this is the maximum amount we could ever need for the type + payload length.
519         int payloadStartIndex = initialWriterIndex + 16;
520         out.writerIndex(payloadStartIndex);
521 
522         if (writer.apply(frame, out)) {
523             int finalWriterIndex = out.writerIndex();
524             int payloadLength = finalWriterIndex - payloadStartIndex;
525             int len = numBytesForVariableLengthInteger(payloadLength);
526             out.writerIndex(payloadStartIndex - len);
527             writeVariableLengthInteger(out, payloadLength, len);
528 
529             int typeLength = numBytesForVariableLengthInteger(type);
530             int startIndex = payloadStartIndex - len - typeLength;
531             out.writerIndex(startIndex);
532             writeVariableLengthInteger(out, type, typeLength);
533 
534             out.setIndex(startIndex, finalWriterIndex);
535             ctx.write(out, promise);
536         } else {
537             // We failed to encode, lets release the buffer so we dont leak.
538             out.release();
539         }
540     }
541 
542     private void writePushPromiseFrame(ChannelHandlerContext ctx, Http3PushPromiseFrame frame, ChannelPromise promise) {
543         assert qpackAttributes != null;
544         final QuicStreamChannel channel = (QuicStreamChannel) ctx.channel();
545         writeDynamicFrame(ctx, frame.type(), frame, (f, out) -> {
546             long id = f.id();
547             writeVariableLengthInteger(out, id);
548             qpackEncoder.encodeHeaders(qpackAttributes, out, ctx.alloc(), channel.streamId(), f.headers());
549             return true;
550         }, promise);
551     }
552 
553     private static void writeGoAwayFrame(
554             ChannelHandlerContext ctx, Http3GoAwayFrame frame, ChannelPromise promise) {
555         writeFrameWithId(ctx, frame.type(), frame.id(), promise);
556     }
557 
558     private static void writeMaxPushIdFrame(
559             ChannelHandlerContext ctx, Http3MaxPushIdFrame frame, ChannelPromise promise) {
560         writeFrameWithId(ctx, frame.type(), frame.id(), promise);
561     }
562 
563     private static void writeFrameWithId(ChannelHandlerContext ctx, long type, long id, ChannelPromise promise) {
564         ByteBuf out = ctx.alloc().directBuffer(24);
565         writeVariableLengthInteger(out, type);
566         writeVariableLengthInteger(out, numBytesForVariableLengthInteger(id));
567         writeVariableLengthInteger(out, id);
568         ctx.write(out, promise);
569     }
570 
571     private void writeUnknownFrame(
572             ChannelHandlerContext ctx, Http3UnknownFrame frame, ChannelPromise promise) {
573         long type = frame.type();
574         if (Http3CodecUtils.isReservedHttp2FrameType(type)) {
575             Http3Exception exception = new Http3Exception(Http3ErrorCode.H3_FRAME_UNEXPECTED,
576                     "Reserved type for HTTP/2 send.");
577             promise.setFailure(exception);
578             // See https://tools.ietf.org/html/draft-ietf-quic-http-32#section-7.2.8
579             connectionError(ctx, exception.errorCode(), exception.getMessage(), false);
580             return;
581         }
582         if (!Http3CodecUtils.isReservedFrameType(type)) {
583             Http3Exception exception = new Http3Exception(Http3ErrorCode.H3_FRAME_UNEXPECTED,
584                     "Non reserved type for HTTP/3 send.");
585             promise.setFailure(exception);
586             return;
587         }
588         ByteBuf out = ctx.alloc().directBuffer();
589         writeVariableLengthInteger(out, type);
590         writeVariableLengthInteger(out, frame.content().readableBytes());
591         ByteBuf content = frame.content().retain();
592         ctx.write(Unpooled.wrappedUnmodifiableBuffer(out, content), promise);
593     }
594 
595     private void initReadResumptionListenerIfRequired(ChannelHandlerContext ctx) {
596         if (readResumptionListener == null) {
597             readResumptionListener = new ReadResumptionListener(ctx, this);
598         }
599     }
600 
601     private static void unsupported(ChannelPromise promise) {
602         promise.setFailure(new UnsupportedOperationException());
603     }
604 
605     @Override
606     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
607         ctx.bind(localAddress, promise);
608     }
609 
610     @Override
611     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
612                         SocketAddress localAddress, ChannelPromise promise) {
613         ctx.connect(remoteAddress, localAddress, promise);
614     }
615 
616     @Override
617     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
618         ctx.disconnect(promise);
619     }
620 
621     @Override
622     public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
623         ctx.close(promise);
624     }
625 
626     @Override
627     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
628         ctx.deregister(promise);
629     }
630 
631     @Override
632     public void read(ChannelHandlerContext ctx) {
633         assert readResumptionListener != null;
634         if (readResumptionListener.readRequested()) {
635             ctx.read();
636         }
637     }
638 
639     @Override
640     public void flush(ChannelHandlerContext ctx) {
641         if (writeResumptionListener != null) {
642             writeResumptionListener.enqueueFlush();
643         } else {
644             ctx.flush();
645         }
646     }
647 
648     private static final class ReadResumptionListener
649             implements Runnable, GenericFutureListener<Future<? super QuicStreamChannel>> {
650         private static final int STATE_SUSPENDED = 0b1000_0000;
651         private static final int STATE_READ_PENDING = 0b0100_0000;
652         private static final int STATE_READ_COMPLETE_PENDING = 0b0010_0000;
653 
654         private final ChannelHandlerContext ctx;
655         private final Http3FrameCodec codec;
656         private byte state;
657 
658         ReadResumptionListener(ChannelHandlerContext ctx, Http3FrameCodec codec) {
659             this.ctx = ctx;
660             this.codec = codec;
661             assert codec.qpackAttributes != null;
662             if (!codec.qpackAttributes.dynamicTableDisabled() && !codec.qpackAttributes.decoderStreamAvailable()) {
663                 codec.qpackAttributes.whenDecoderStreamAvailable(this);
664             }
665         }
666 
667         void suspended() {
668             assert !codec.qpackAttributes.dynamicTableDisabled();
669             setState(STATE_SUSPENDED);
670         }
671 
672         boolean readCompleted() {
673             if (hasState(STATE_SUSPENDED)) {
674                 setState(STATE_READ_COMPLETE_PENDING);
675                 return false;
676             }
677             return true;
678         }
679 
680         boolean readRequested() {
681             if (hasState(STATE_SUSPENDED)) {
682                 setState(STATE_READ_PENDING);
683                 return false;
684             }
685             return true;
686         }
687 
688         boolean isSuspended() {
689             return hasState(STATE_SUSPENDED);
690         }
691 
692         @Override
693         public void operationComplete(Future<? super QuicStreamChannel> future) {
694             if (future.isSuccess()) {
695                 resume();
696             } else {
697                 ctx.fireExceptionCaught(future.cause());
698             }
699         }
700 
701         @Override
702         public void run() {
703             resume();
704         }
705 
706         private void resume() {
707             unsetState(STATE_SUSPENDED);
708             try {
709                 codec.channelRead(ctx, Unpooled.EMPTY_BUFFER);
710                 if (hasState(STATE_READ_COMPLETE_PENDING)) {
711                     unsetState(STATE_READ_COMPLETE_PENDING);
712                     codec.channelReadComplete(ctx);
713                 }
714                 if (hasState(STATE_READ_PENDING)) {
715                     unsetState(STATE_READ_PENDING);
716                     codec.read(ctx);
717                 }
718             } catch (Exception e) {
719                 ctx.fireExceptionCaught(e);
720             }
721         }
722 
723         private void setState(int toSet) {
724             state |= toSet;
725         }
726 
727         private boolean hasState(int toCheck) {
728             return (state & toCheck) == toCheck;
729         }
730 
731         private void unsetState(int toUnset) {
732             state &= ~toUnset;
733         }
734     }
735 
736     private static final class WriteResumptionListener
737             implements GenericFutureListener<Future<? super QuicStreamChannel>> {
738         private static final Object FLUSH = new Object();
739         private final PendingWriteQueue queue;
740         private final ChannelHandlerContext ctx;
741         private final Http3FrameCodec codec;
742 
743         private WriteResumptionListener(ChannelHandlerContext ctx, Http3FrameCodec codec) {
744             this.ctx = ctx;
745             this.codec = codec;
746             queue = new PendingWriteQueue(ctx);
747         }
748 
749         @Override
750         public void operationComplete(Future<? super QuicStreamChannel> future) {
751             drain();
752         }
753 
754         void enqueue(Object msg, ChannelPromise promise) {
755             assert ctx.channel().eventLoop().inEventLoop();
756             // Touch the message to allow easier debugging of memory leaks
757             ReferenceCountUtil.touch(msg);
758             queue.add(msg, promise);
759         }
760 
761         void enqueueFlush() {
762             assert ctx.channel().eventLoop().inEventLoop();
763             queue.add(FLUSH, ctx.voidPromise());
764         }
765 
766         void drain() {
767             assert ctx.channel().eventLoop().inEventLoop();
768             boolean flushSeen = false;
769             try {
770                 for (;;) {
771                     Object entry = queue.current();
772                     if (entry == null) {
773                         break;
774                     }
775                     if (entry == FLUSH) {
776                         flushSeen = true;
777                         queue.remove().trySuccess();
778                     } else {
779                         // Retain the entry as remove() will call release() as well.
780                         codec.write0(ctx, ReferenceCountUtil.retain(entry), queue.remove());
781                     }
782                 }
783                 // indicate that writes do not need to be enqueued. As we are on the eventloop, no other writes can
784                 // happen while we are draining, hence we would not write out of order.
785                 codec.writeResumptionListener = null;
786             } finally {
787                 if (flushSeen) {
788                     codec.flush(ctx);
789                 }
790             }
791         }
792 
793         static WriteResumptionListener newListener(ChannelHandlerContext ctx, Http3FrameCodec codec) {
794             WriteResumptionListener listener = new WriteResumptionListener(ctx, codec);
795             assert codec.qpackAttributes != null;
796             codec.qpackAttributes.whenEncoderStreamAvailable(listener);
797             return listener;
798         }
799     }
800 
801     /**
802      * A factory for creating codec for HTTP3 frames.
803      */
804     @FunctionalInterface
805     interface Http3FrameCodecFactory {
806         /**
807          * Creates a new codec instance for the passed {@code streamType}.
808          *
809          * @param validator for the frames.
810          * @param encodeState for the request stream.
811          * @param decodeState for the request stream.
812          * @return new codec instance for the passed {@code streamType}.
813          */
814         ChannelHandler newCodec(Http3FrameTypeValidator validator, Http3RequestStreamCodecState encodeState,
815                                 Http3RequestStreamCodecState decodeState);
816     }
817 }