1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http3;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelHandler;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelOutboundHandler;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.PendingWriteQueue;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.quic.QuicStreamChannel;
27 import io.netty.handler.codec.quic.QuicStreamFrame;
28 import io.netty.util.ReferenceCountUtil;
29 import io.netty.util.concurrent.Future;
30 import io.netty.util.concurrent.GenericFutureListener;
31 import org.jetbrains.annotations.Nullable;
32
33 import java.net.SocketAddress;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.function.BiFunction;
37
38 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_CANCEL_PUSH_FRAME_MAX_LEN;
39 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_CANCEL_PUSH_FRAME_TYPE;
40 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_DATA_FRAME_TYPE;
41 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_GO_AWAY_FRAME_MAX_LEN;
42 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_GO_AWAY_FRAME_TYPE;
43 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_HEADERS_FRAME_TYPE;
44 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_MAX_PUSH_ID_FRAME_MAX_LEN;
45 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_MAX_PUSH_ID_FRAME_TYPE;
46 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_PUSH_PROMISE_FRAME_TYPE;
47 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_SETTINGS_FRAME_MAX_LEN;
48 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_SETTINGS_FRAME_TYPE;
49 import static io.netty.handler.codec.http3.Http3CodecUtils.numBytesForVariableLengthInteger;
50 import static io.netty.handler.codec.http3.Http3CodecUtils.readVariableLengthInteger;
51 import static io.netty.handler.codec.http3.Http3CodecUtils.writeVariableLengthInteger;
52 import static io.netty.util.internal.ObjectUtil.checkNotNull;
53 import static io.netty.util.internal.ObjectUtil.checkPositive;
54
55
56
57
58 final class Http3FrameCodec extends ByteToMessageDecoder implements ChannelOutboundHandler {
59 private final Http3FrameTypeValidator validator;
60 private final long maxHeaderListSize;
61 private final QpackDecoder qpackDecoder;
62 private final QpackEncoder qpackEncoder;
63 private final Http3RequestStreamCodecState encodeState;
64 private final Http3RequestStreamCodecState decodeState;
65
66 private boolean firstFrame = true;
67 private boolean error;
68 private long type = -1;
69 private int payLoadLength = -1;
70 private QpackAttributes qpackAttributes;
71 private ReadResumptionListener readResumptionListener;
72 private WriteResumptionListener writeResumptionListener;
73
74 static Http3FrameCodecFactory newFactory(QpackDecoder qpackDecoder,
75 long maxHeaderListSize, QpackEncoder qpackEncoder) {
76 checkNotNull(qpackEncoder, "qpackEncoder");
77 checkNotNull(qpackDecoder, "qpackDecoder");
78
79
80 return (validator, encodeState, decodeState) -> new Http3FrameCodec(validator, qpackDecoder,
81 maxHeaderListSize, qpackEncoder, encodeState, decodeState);
82 }
83
84 Http3FrameCodec(Http3FrameTypeValidator validator, QpackDecoder qpackDecoder,
85 long maxHeaderListSize, QpackEncoder qpackEncoder, Http3RequestStreamCodecState encodeState,
86 Http3RequestStreamCodecState decodeState) {
87 this.validator = checkNotNull(validator, "validator");
88 this.qpackDecoder = checkNotNull(qpackDecoder, "qpackDecoder");
89 this.maxHeaderListSize = checkPositive(maxHeaderListSize, "maxHeaderListSize");
90 this.qpackEncoder = checkNotNull(qpackEncoder, "qpackEncoder");
91 this.encodeState = checkNotNull(encodeState, "encodeState");
92 this.decodeState = checkNotNull(decodeState, "decodeState");
93 }
94
95 @Override
96 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
97 qpackAttributes = Http3.getQpackAttributes(ctx.channel().parent());
98 assert qpackAttributes != null;
99
100 initReadResumptionListenerIfRequired(ctx);
101 super.handlerAdded(ctx);
102 }
103
104 @Override
105 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
106 if (writeResumptionListener != null) {
107 writeResumptionListener.drain();
108 }
109 super.channelInactive(ctx);
110 }
111
112 @Override
113 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
114 if (writeResumptionListener != null) {
115
116 writeResumptionListener.drain();
117 }
118 super.handlerRemoved0(ctx);
119 }
120
121 @Override
122 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
123 ByteBuf buffer;
124 if (msg instanceof QuicStreamFrame) {
125 QuicStreamFrame streamFrame = (QuicStreamFrame) msg;
126 buffer = streamFrame.content().retain();
127 streamFrame.release();
128 } else {
129 buffer = (ByteBuf) msg;
130 }
131 super.channelRead(ctx, buffer);
132 }
133
134 @Override
135 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
136 assert readResumptionListener != null;
137 if (readResumptionListener.readCompleted()) {
138 super.channelReadComplete(ctx);
139 }
140 }
141
142 private void connectionError(ChannelHandlerContext ctx, Http3ErrorCode code, String msg, boolean fireException) {
143 error = true;
144 Http3CodecUtils.connectionError(ctx, code, msg, fireException);
145 }
146
147 private void connectionError(ChannelHandlerContext ctx, Http3Exception exception, boolean fireException) {
148 error = true;
149 Http3CodecUtils.connectionError(ctx, exception, fireException);
150 }
151
152 @Override
153 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
154 assert readResumptionListener != null;
155 if (!in.isReadable() || readResumptionListener.isSuspended()) {
156 return;
157 }
158 if (error) {
159 in.skipBytes(in.readableBytes());
160 return;
161 }
162 if (type == -1) {
163 int typeLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
164 if (in.readableBytes() < typeLen) {
165 return;
166 }
167 long localType = readVariableLengthInteger(in, typeLen);
168 if (Http3CodecUtils.isReservedHttp2FrameType(localType)) {
169
170 connectionError(ctx, Http3ErrorCode.H3_FRAME_UNEXPECTED,
171 "Reserved type for HTTP/2 received.", true);
172 return;
173 }
174 try {
175
176 validator.validate(localType, firstFrame);
177 } catch (Http3Exception e) {
178 connectionError(ctx, e, true);
179 return;
180 }
181 type = localType;
182 firstFrame = false;
183 if (!in.isReadable()) {
184 return;
185 }
186 }
187 if (payLoadLength == -1) {
188 int payloadLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
189 assert payloadLen <= 8;
190 if (in.readableBytes() < payloadLen) {
191 return;
192 }
193 long len = readVariableLengthInteger(in, payloadLen);
194 if (len > Integer.MAX_VALUE) {
195 connectionError(ctx, Http3ErrorCode.H3_EXCESSIVE_LOAD,
196 "Received an invalid frame len.", true);
197 return;
198 }
199 payLoadLength = (int) len;
200 }
201 int read = decodeFrame(ctx, type, payLoadLength, in, out);
202 if (read >= 0) {
203 if (read == payLoadLength) {
204 type = -1;
205 payLoadLength = -1;
206 } else {
207 payLoadLength -= read;
208 }
209 }
210 }
211
212 private static int skipBytes(ByteBuf in, int payLoadLength) {
213 in.skipBytes(payLoadLength);
214 return payLoadLength;
215 }
216
217 private int decodeFrame(ChannelHandlerContext ctx, long longType, int payLoadLength, ByteBuf in, List<Object> out) {
218 if (longType > Integer.MAX_VALUE && !Http3CodecUtils.isReservedFrameType(longType)) {
219 return skipBytes(in, payLoadLength);
220 }
221 int type = (int) longType;
222
223 switch (type) {
224 case HTTP3_DATA_FRAME_TYPE:
225
226
227 int readable = in.readableBytes();
228 if (readable == 0 && payLoadLength > 0) {
229 return 0;
230 }
231 int length = Math.min(readable, payLoadLength);
232 out.add(new DefaultHttp3DataFrame(in.readRetainedSlice(length)));
233 return length;
234 case HTTP3_HEADERS_FRAME_TYPE:
235
236
237 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
238
239
240
241 maxHeaderListSize, Http3ErrorCode.H3_EXCESSIVE_LOAD)) {
242 return 0;
243 }
244 assert qpackAttributes != null;
245 if (!qpackAttributes.dynamicTableDisabled() && !qpackAttributes.decoderStreamAvailable()) {
246 assert readResumptionListener != null;
247 readResumptionListener.suspended();
248 return 0;
249 }
250
251 Http3HeadersFrame headersFrame = new DefaultHttp3HeadersFrame();
252 if (decodeHeaders(ctx, headersFrame.headers(), in, payLoadLength, decodeState.receivedFinalHeaders())) {
253 out.add(headersFrame);
254 return payLoadLength;
255 }
256 return -1;
257 case HTTP3_CANCEL_PUSH_FRAME_TYPE:
258
259
260 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
261 HTTP3_CANCEL_PUSH_FRAME_MAX_LEN, Http3ErrorCode.H3_FRAME_ERROR)) {
262 return 0;
263 }
264 int pushIdLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
265 out.add(new DefaultHttp3CancelPushFrame(readVariableLengthInteger(in, pushIdLen)));
266 return payLoadLength;
267 case HTTP3_SETTINGS_FRAME_TYPE:
268
269
270
271
272 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength, HTTP3_SETTINGS_FRAME_MAX_LEN,
273 Http3ErrorCode.H3_EXCESSIVE_LOAD)) {
274 return 0;
275 }
276 Http3SettingsFrame settingsFrame = decodeSettings(ctx, in, payLoadLength);
277 if (settingsFrame != null) {
278 out.add(settingsFrame);
279 }
280 return payLoadLength;
281 case HTTP3_PUSH_PROMISE_FRAME_TYPE:
282
283
284 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
285
286
287
288 Math.max(maxHeaderListSize, maxHeaderListSize + 8), Http3ErrorCode.H3_EXCESSIVE_LOAD)) {
289 return 0;
290 }
291
292 assert qpackAttributes != null;
293 if (!qpackAttributes.dynamicTableDisabled() && !qpackAttributes.decoderStreamAvailable()) {
294 assert readResumptionListener != null;
295 readResumptionListener.suspended();
296 return 0;
297 }
298 int readerIdx = in.readerIndex();
299 int pushPromiseIdLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
300 Http3PushPromiseFrame pushPromiseFrame = new DefaultHttp3PushPromiseFrame(
301 readVariableLengthInteger(in, pushPromiseIdLen));
302 if (decodeHeaders(ctx, pushPromiseFrame.headers(), in, payLoadLength - pushPromiseIdLen, false)) {
303 out.add(pushPromiseFrame);
304 return payLoadLength;
305 }
306 in.readerIndex(readerIdx);
307 return -1;
308 case HTTP3_GO_AWAY_FRAME_TYPE:
309
310
311 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
312 HTTP3_GO_AWAY_FRAME_MAX_LEN, Http3ErrorCode.H3_FRAME_ERROR)) {
313 return 0;
314 }
315 int idLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
316 out.add(new DefaultHttp3GoAwayFrame(readVariableLengthInteger(in, idLen)));
317 return payLoadLength;
318 case HTTP3_MAX_PUSH_ID_FRAME_TYPE:
319
320
321 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
322 HTTP3_MAX_PUSH_ID_FRAME_MAX_LEN, Http3ErrorCode.H3_FRAME_ERROR)) {
323 return 0;
324 }
325 int pidLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
326 out.add(new DefaultHttp3MaxPushIdFrame(readVariableLengthInteger(in, pidLen)));
327 return payLoadLength;
328 default:
329 if (!Http3CodecUtils.isReservedFrameType(longType)) {
330 return skipBytes(in, payLoadLength);
331 }
332
333
334 if (in.readableBytes() < payLoadLength) {
335 return 0;
336 }
337 out.add(new DefaultHttp3UnknownFrame(longType, in.readRetainedSlice(payLoadLength)));
338 return payLoadLength;
339 }
340 }
341
342 private boolean enforceMaxPayloadLength(
343 ChannelHandlerContext ctx, ByteBuf in, int type, int payLoadLength,
344 long maxPayLoadLength, Http3ErrorCode error) {
345 if (payLoadLength > maxPayLoadLength) {
346 connectionError(ctx, error,
347 "Received an invalid frame len " + payLoadLength + " for frame of type " + type + '.', true);
348 return false;
349 }
350 return in.readableBytes() >= payLoadLength;
351 }
352
353 @Nullable
354 private Http3SettingsFrame decodeSettings(ChannelHandlerContext ctx, ByteBuf in, int payLoadLength) {
355 Http3SettingsFrame settingsFrame = new DefaultHttp3SettingsFrame();
356 while (payLoadLength > 0) {
357 int keyLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
358 long key = readVariableLengthInteger(in, keyLen);
359 if (Http3CodecUtils.isReservedHttp2Setting(key)) {
360
361
362 connectionError(ctx, Http3ErrorCode.H3_SETTINGS_ERROR,
363 "Received a settings key that is reserved for HTTP/2.", true);
364 return null;
365 }
366 payLoadLength -= keyLen;
367 int valueLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
368 long value = readVariableLengthInteger(in, valueLen);
369 payLoadLength -= valueLen;
370
371 if (settingsFrame.put(key, value) != null) {
372
373
374 connectionError(ctx, Http3ErrorCode.H3_SETTINGS_ERROR,
375 "Received a duplicate settings key.", true);
376 return null;
377 }
378 }
379 return settingsFrame;
380 }
381
382
383
384
385
386
387
388
389
390
391
392
393
394 private boolean decodeHeaders(ChannelHandlerContext ctx, Http3Headers headers, ByteBuf in, int length,
395 boolean trailer) {
396 try {
397 Http3HeadersSink sink = new Http3HeadersSink(headers, maxHeaderListSize, true, trailer);
398 assert qpackAttributes != null;
399 assert readResumptionListener != null;
400 if (qpackDecoder.decode(qpackAttributes,
401 ((QuicStreamChannel) ctx.channel()).streamId(), in, length, sink, readResumptionListener)) {
402
403 sink.finish();
404 return true;
405 }
406 readResumptionListener.suspended();
407 } catch (Http3Exception e) {
408 connectionError(ctx, e.errorCode(), e.getMessage(), true);
409 } catch (QpackException e) {
410
411 connectionError(ctx, Http3ErrorCode.QPACK_DECOMPRESSION_FAILED,
412 "Decompression of header block failed.", true);
413 } catch (Http3HeadersValidationException e) {
414 error = true;
415 ctx.fireExceptionCaught(e);
416
417
418 Http3CodecUtils.streamError(ctx, Http3ErrorCode.H3_MESSAGE_ERROR);
419 }
420 return false;
421 }
422
423 @Override
424 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
425 assert qpackAttributes != null;
426 if (writeResumptionListener != null) {
427 writeResumptionListener.enqueue(msg, promise);
428 return;
429 }
430
431 if ((msg instanceof Http3HeadersFrame || msg instanceof Http3PushPromiseFrame) &&
432 !qpackAttributes.dynamicTableDisabled() && !qpackAttributes.encoderStreamAvailable()) {
433 writeResumptionListener = WriteResumptionListener.newListener(ctx, this);
434 writeResumptionListener.enqueue(msg, promise);
435 return;
436 }
437
438 write0(ctx, msg, promise);
439 }
440
441 private void write0(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
442 try {
443 if (msg instanceof Http3DataFrame) {
444 writeDataFrame(ctx, (Http3DataFrame) msg, promise);
445 } else if (msg instanceof Http3HeadersFrame) {
446 writeHeadersFrame(ctx, (Http3HeadersFrame) msg, promise);
447 } else if (msg instanceof Http3CancelPushFrame) {
448 writeCancelPushFrame(ctx, (Http3CancelPushFrame) msg, promise);
449 } else if (msg instanceof Http3SettingsFrame) {
450 writeSettingsFrame(ctx, (Http3SettingsFrame) msg, promise);
451 } else if (msg instanceof Http3PushPromiseFrame) {
452 writePushPromiseFrame(ctx, (Http3PushPromiseFrame) msg, promise);
453 } else if (msg instanceof Http3GoAwayFrame) {
454 writeGoAwayFrame(ctx, (Http3GoAwayFrame) msg, promise);
455 } else if (msg instanceof Http3MaxPushIdFrame) {
456 writeMaxPushIdFrame(ctx, (Http3MaxPushIdFrame) msg, promise);
457 } else if (msg instanceof Http3UnknownFrame) {
458 writeUnknownFrame(ctx, (Http3UnknownFrame) msg, promise);
459 } else {
460 unsupported(promise);
461 }
462 } finally {
463 ReferenceCountUtil.release(msg);
464 }
465 }
466
467 private static void writeDataFrame(
468 ChannelHandlerContext ctx, Http3DataFrame frame, ChannelPromise promise) {
469 ByteBuf out = ctx.alloc().directBuffer(16);
470 writeVariableLengthInteger(out, frame.type());
471 writeVariableLengthInteger(out, frame.content().readableBytes());
472 ByteBuf content = frame.content().retain();
473 ctx.write(Unpooled.wrappedUnmodifiableBuffer(out, content), promise);
474 }
475
476 private void writeHeadersFrame(ChannelHandlerContext ctx, Http3HeadersFrame frame, ChannelPromise promise) {
477 assert qpackAttributes != null;
478 final QuicStreamChannel channel = (QuicStreamChannel) ctx.channel();
479 writeDynamicFrame(ctx, frame.type(), frame, (f, out) -> {
480 qpackEncoder.encodeHeaders(qpackAttributes, out, ctx.alloc(), channel.streamId(), f.headers());
481 return true;
482 }, promise);
483 }
484
485 private static void writeCancelPushFrame(
486 ChannelHandlerContext ctx, Http3CancelPushFrame frame, ChannelPromise promise) {
487 writeFrameWithId(ctx, frame.type(), frame.id(), promise);
488 }
489
490 private static void writeSettingsFrame(
491 ChannelHandlerContext ctx, Http3SettingsFrame frame, ChannelPromise promise) {
492 writeDynamicFrame(ctx, frame.type(), frame, (f, out) -> {
493 for (Map.Entry<Long, Long> e : f) {
494 Long key = e.getKey();
495 if (Http3CodecUtils.isReservedHttp2Setting(key)) {
496 Http3Exception exception = new Http3Exception(Http3ErrorCode.H3_SETTINGS_ERROR,
497 "Received a settings key that is reserved for HTTP/2.");
498 promise.setFailure(exception);
499
500 Http3CodecUtils.connectionError(ctx, exception, false);
501 return false;
502 }
503 Long value = e.getValue();
504 int keyLen = numBytesForVariableLengthInteger(key);
505 int valueLen = numBytesForVariableLengthInteger(value);
506 writeVariableLengthInteger(out, key, keyLen);
507 writeVariableLengthInteger(out, value, valueLen);
508 }
509 return true;
510 }, promise);
511 }
512
513 private static <T extends Http3Frame> void writeDynamicFrame(ChannelHandlerContext ctx, long type, T frame,
514 BiFunction<T, ByteBuf, Boolean> writer,
515 ChannelPromise promise) {
516 ByteBuf out = ctx.alloc().directBuffer();
517 int initialWriterIndex = out.writerIndex();
518
519 int payloadStartIndex = initialWriterIndex + 16;
520 out.writerIndex(payloadStartIndex);
521
522 if (writer.apply(frame, out)) {
523 int finalWriterIndex = out.writerIndex();
524 int payloadLength = finalWriterIndex - payloadStartIndex;
525 int len = numBytesForVariableLengthInteger(payloadLength);
526 out.writerIndex(payloadStartIndex - len);
527 writeVariableLengthInteger(out, payloadLength, len);
528
529 int typeLength = numBytesForVariableLengthInteger(type);
530 int startIndex = payloadStartIndex - len - typeLength;
531 out.writerIndex(startIndex);
532 writeVariableLengthInteger(out, type, typeLength);
533
534 out.setIndex(startIndex, finalWriterIndex);
535 ctx.write(out, promise);
536 } else {
537
538 out.release();
539 }
540 }
541
542 private void writePushPromiseFrame(ChannelHandlerContext ctx, Http3PushPromiseFrame frame, ChannelPromise promise) {
543 assert qpackAttributes != null;
544 final QuicStreamChannel channel = (QuicStreamChannel) ctx.channel();
545 writeDynamicFrame(ctx, frame.type(), frame, (f, out) -> {
546 long id = f.id();
547 writeVariableLengthInteger(out, id);
548 qpackEncoder.encodeHeaders(qpackAttributes, out, ctx.alloc(), channel.streamId(), f.headers());
549 return true;
550 }, promise);
551 }
552
553 private static void writeGoAwayFrame(
554 ChannelHandlerContext ctx, Http3GoAwayFrame frame, ChannelPromise promise) {
555 writeFrameWithId(ctx, frame.type(), frame.id(), promise);
556 }
557
558 private static void writeMaxPushIdFrame(
559 ChannelHandlerContext ctx, Http3MaxPushIdFrame frame, ChannelPromise promise) {
560 writeFrameWithId(ctx, frame.type(), frame.id(), promise);
561 }
562
563 private static void writeFrameWithId(ChannelHandlerContext ctx, long type, long id, ChannelPromise promise) {
564 ByteBuf out = ctx.alloc().directBuffer(24);
565 writeVariableLengthInteger(out, type);
566 writeVariableLengthInteger(out, numBytesForVariableLengthInteger(id));
567 writeVariableLengthInteger(out, id);
568 ctx.write(out, promise);
569 }
570
571 private void writeUnknownFrame(
572 ChannelHandlerContext ctx, Http3UnknownFrame frame, ChannelPromise promise) {
573 long type = frame.type();
574 if (Http3CodecUtils.isReservedHttp2FrameType(type)) {
575 Http3Exception exception = new Http3Exception(Http3ErrorCode.H3_FRAME_UNEXPECTED,
576 "Reserved type for HTTP/2 send.");
577 promise.setFailure(exception);
578
579 connectionError(ctx, exception.errorCode(), exception.getMessage(), false);
580 return;
581 }
582 if (!Http3CodecUtils.isReservedFrameType(type)) {
583 Http3Exception exception = new Http3Exception(Http3ErrorCode.H3_FRAME_UNEXPECTED,
584 "Non reserved type for HTTP/3 send.");
585 promise.setFailure(exception);
586 return;
587 }
588 ByteBuf out = ctx.alloc().directBuffer();
589 writeVariableLengthInteger(out, type);
590 writeVariableLengthInteger(out, frame.content().readableBytes());
591 ByteBuf content = frame.content().retain();
592 ctx.write(Unpooled.wrappedUnmodifiableBuffer(out, content), promise);
593 }
594
595 private void initReadResumptionListenerIfRequired(ChannelHandlerContext ctx) {
596 if (readResumptionListener == null) {
597 readResumptionListener = new ReadResumptionListener(ctx, this);
598 }
599 }
600
601 private static void unsupported(ChannelPromise promise) {
602 promise.setFailure(new UnsupportedOperationException());
603 }
604
605 @Override
606 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
607 ctx.bind(localAddress, promise);
608 }
609
610 @Override
611 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
612 SocketAddress localAddress, ChannelPromise promise) {
613 ctx.connect(remoteAddress, localAddress, promise);
614 }
615
616 @Override
617 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
618 ctx.disconnect(promise);
619 }
620
621 @Override
622 public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
623 ctx.close(promise);
624 }
625
626 @Override
627 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
628 ctx.deregister(promise);
629 }
630
631 @Override
632 public void read(ChannelHandlerContext ctx) {
633 assert readResumptionListener != null;
634 if (readResumptionListener.readRequested()) {
635 ctx.read();
636 }
637 }
638
639 @Override
640 public void flush(ChannelHandlerContext ctx) {
641 if (writeResumptionListener != null) {
642 writeResumptionListener.enqueueFlush();
643 } else {
644 ctx.flush();
645 }
646 }
647
648 private static final class ReadResumptionListener
649 implements Runnable, GenericFutureListener<Future<? super QuicStreamChannel>> {
650 private static final int STATE_SUSPENDED = 0b1000_0000;
651 private static final int STATE_READ_PENDING = 0b0100_0000;
652 private static final int STATE_READ_COMPLETE_PENDING = 0b0010_0000;
653
654 private final ChannelHandlerContext ctx;
655 private final Http3FrameCodec codec;
656 private byte state;
657
658 ReadResumptionListener(ChannelHandlerContext ctx, Http3FrameCodec codec) {
659 this.ctx = ctx;
660 this.codec = codec;
661 assert codec.qpackAttributes != null;
662 if (!codec.qpackAttributes.dynamicTableDisabled() && !codec.qpackAttributes.decoderStreamAvailable()) {
663 codec.qpackAttributes.whenDecoderStreamAvailable(this);
664 }
665 }
666
667 void suspended() {
668 assert !codec.qpackAttributes.dynamicTableDisabled();
669 setState(STATE_SUSPENDED);
670 }
671
672 boolean readCompleted() {
673 if (hasState(STATE_SUSPENDED)) {
674 setState(STATE_READ_COMPLETE_PENDING);
675 return false;
676 }
677 return true;
678 }
679
680 boolean readRequested() {
681 if (hasState(STATE_SUSPENDED)) {
682 setState(STATE_READ_PENDING);
683 return false;
684 }
685 return true;
686 }
687
688 boolean isSuspended() {
689 return hasState(STATE_SUSPENDED);
690 }
691
692 @Override
693 public void operationComplete(Future<? super QuicStreamChannel> future) {
694 if (future.isSuccess()) {
695 resume();
696 } else {
697 ctx.fireExceptionCaught(future.cause());
698 }
699 }
700
701 @Override
702 public void run() {
703 resume();
704 }
705
706 private void resume() {
707 unsetState(STATE_SUSPENDED);
708 try {
709 codec.channelRead(ctx, Unpooled.EMPTY_BUFFER);
710 if (hasState(STATE_READ_COMPLETE_PENDING)) {
711 unsetState(STATE_READ_COMPLETE_PENDING);
712 codec.channelReadComplete(ctx);
713 }
714 if (hasState(STATE_READ_PENDING)) {
715 unsetState(STATE_READ_PENDING);
716 codec.read(ctx);
717 }
718 } catch (Exception e) {
719 ctx.fireExceptionCaught(e);
720 }
721 }
722
723 private void setState(int toSet) {
724 state |= toSet;
725 }
726
727 private boolean hasState(int toCheck) {
728 return (state & toCheck) == toCheck;
729 }
730
731 private void unsetState(int toUnset) {
732 state &= ~toUnset;
733 }
734 }
735
736 private static final class WriteResumptionListener
737 implements GenericFutureListener<Future<? super QuicStreamChannel>> {
738 private static final Object FLUSH = new Object();
739 private final PendingWriteQueue queue;
740 private final ChannelHandlerContext ctx;
741 private final Http3FrameCodec codec;
742
743 private WriteResumptionListener(ChannelHandlerContext ctx, Http3FrameCodec codec) {
744 this.ctx = ctx;
745 this.codec = codec;
746 queue = new PendingWriteQueue(ctx);
747 }
748
749 @Override
750 public void operationComplete(Future<? super QuicStreamChannel> future) {
751 drain();
752 }
753
754 void enqueue(Object msg, ChannelPromise promise) {
755 assert ctx.channel().eventLoop().inEventLoop();
756
757 ReferenceCountUtil.touch(msg);
758 queue.add(msg, promise);
759 }
760
761 void enqueueFlush() {
762 assert ctx.channel().eventLoop().inEventLoop();
763 queue.add(FLUSH, ctx.voidPromise());
764 }
765
766 void drain() {
767 assert ctx.channel().eventLoop().inEventLoop();
768 boolean flushSeen = false;
769 try {
770 for (;;) {
771 Object entry = queue.current();
772 if (entry == null) {
773 break;
774 }
775 if (entry == FLUSH) {
776 flushSeen = true;
777 queue.remove().trySuccess();
778 } else {
779
780 codec.write0(ctx, ReferenceCountUtil.retain(entry), queue.remove());
781 }
782 }
783
784
785 codec.writeResumptionListener = null;
786 } finally {
787 if (flushSeen) {
788 codec.flush(ctx);
789 }
790 }
791 }
792
793 static WriteResumptionListener newListener(ChannelHandlerContext ctx, Http3FrameCodec codec) {
794 WriteResumptionListener listener = new WriteResumptionListener(ctx, codec);
795 assert codec.qpackAttributes != null;
796 codec.qpackAttributes.whenEncoderStreamAvailable(listener);
797 return listener;
798 }
799 }
800
801
802
803
804 @FunctionalInterface
805 interface Http3FrameCodecFactory {
806
807
808
809
810
811
812
813
814 ChannelHandler newCodec(Http3FrameTypeValidator validator, Http3RequestStreamCodecState encodeState,
815 Http3RequestStreamCodecState decodeState);
816 }
817 }