1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http3;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelHandler;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelOutboundHandler;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.PendingWriteQueue;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.quic.QuicStreamChannel;
27 import io.netty.handler.codec.quic.QuicStreamFrame;
28 import io.netty.util.ReferenceCountUtil;
29 import io.netty.util.concurrent.Future;
30 import io.netty.util.concurrent.GenericFutureListener;
31 import org.jetbrains.annotations.Nullable;
32
33 import java.net.SocketAddress;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.function.BiFunction;
37
38 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_CANCEL_PUSH_FRAME_MAX_LEN;
39 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_CANCEL_PUSH_FRAME_TYPE;
40 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_DATA_FRAME_TYPE;
41 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_GO_AWAY_FRAME_MAX_LEN;
42 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_GO_AWAY_FRAME_TYPE;
43 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_HEADERS_FRAME_TYPE;
44 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_MAX_PUSH_ID_FRAME_MAX_LEN;
45 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_MAX_PUSH_ID_FRAME_TYPE;
46 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_PUSH_PROMISE_FRAME_TYPE;
47 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_SETTINGS_FRAME_MAX_LEN;
48 import static io.netty.handler.codec.http3.Http3CodecUtils.HTTP3_SETTINGS_FRAME_TYPE;
49 import static io.netty.handler.codec.http3.Http3CodecUtils.numBytesForVariableLengthInteger;
50 import static io.netty.handler.codec.http3.Http3CodecUtils.readVariableLengthInteger;
51 import static io.netty.handler.codec.http3.Http3CodecUtils.writeVariableLengthInteger;
52 import static io.netty.util.internal.ObjectUtil.checkNotNull;
53 import static io.netty.util.internal.ObjectUtil.checkPositive;
54
55
56
57
58 final class Http3FrameCodec extends ByteToMessageDecoder implements ChannelOutboundHandler {
59 private final Http3FrameTypeValidator validator;
60 private final long maxHeaderListSize;
61 private final QpackDecoder qpackDecoder;
62 private final QpackEncoder qpackEncoder;
63 private final Http3RequestStreamCodecState encodeState;
64 private final Http3RequestStreamCodecState decodeState;
65 private final Http3Settings.NonStandardHttp3SettingsValidator nonStandardSettingsValidator;
66 private boolean firstFrame = true;
67 private boolean error;
68 private long type = -1;
69 private int payLoadLength = -1;
70 private QpackAttributes qpackAttributes;
71 private ReadResumptionListener readResumptionListener;
72 private WriteResumptionListener writeResumptionListener;
73
74 static Http3FrameCodecFactory newFactory(QpackDecoder qpackDecoder,
75 long maxHeaderListSize, QpackEncoder qpackEncoder) {
76 checkNotNull(qpackEncoder, "qpackEncoder");
77 checkNotNull(qpackDecoder, "qpackDecoder");
78
79
80 return (validator, encodeState, decodeState,
81 nonStandardSettingsValidator) -> new Http3FrameCodec(validator, qpackDecoder,
82 maxHeaderListSize, qpackEncoder, encodeState, decodeState, nonStandardSettingsValidator);
83 }
84
85 Http3FrameCodec(Http3FrameTypeValidator validator, QpackDecoder qpackDecoder,
86 long maxHeaderListSize, QpackEncoder qpackEncoder, Http3RequestStreamCodecState encodeState,
87 Http3RequestStreamCodecState decodeState,
88 Http3Settings.NonStandardHttp3SettingsValidator nonStandardSettingsValidator) {
89 this.validator = checkNotNull(validator, "validator");
90 this.qpackDecoder = checkNotNull(qpackDecoder, "qpackDecoder");
91 this.maxHeaderListSize = checkPositive(maxHeaderListSize, "maxHeaderListSize");
92 this.qpackEncoder = checkNotNull(qpackEncoder, "qpackEncoder");
93 this.encodeState = checkNotNull(encodeState, "encodeState");
94 this.decodeState = checkNotNull(decodeState, "decodeState");
95 this.nonStandardSettingsValidator = nonStandardSettingsValidator;
96 }
97
98 @Override
99 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
100 qpackAttributes = Http3.getQpackAttributes(ctx.channel().parent());
101 assert qpackAttributes != null;
102
103 initReadResumptionListenerIfRequired(ctx);
104 super.handlerAdded(ctx);
105 }
106
107 @Override
108 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
109 if (writeResumptionListener != null) {
110 writeResumptionListener.drain();
111 }
112 super.channelInactive(ctx);
113 }
114
115 @Override
116 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
117 if (writeResumptionListener != null) {
118
119 writeResumptionListener.drain();
120 }
121 super.handlerRemoved0(ctx);
122 }
123
124 @Override
125 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
126 ByteBuf buffer;
127 if (msg instanceof QuicStreamFrame) {
128 QuicStreamFrame streamFrame = (QuicStreamFrame) msg;
129 buffer = streamFrame.content().retain();
130 streamFrame.release();
131 } else {
132 buffer = (ByteBuf) msg;
133 }
134 super.channelRead(ctx, buffer);
135 }
136
137 @Override
138 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
139 assert readResumptionListener != null;
140 if (readResumptionListener.readCompleted()) {
141 super.channelReadComplete(ctx);
142 }
143 }
144
145 private void connectionError(ChannelHandlerContext ctx, Http3ErrorCode code, String msg, boolean fireException) {
146 error = true;
147 Http3CodecUtils.connectionError(ctx, code, msg, fireException);
148 }
149
150 private void connectionError(ChannelHandlerContext ctx, Http3Exception exception, boolean fireException) {
151 error = true;
152 Http3CodecUtils.connectionError(ctx, exception, fireException);
153 }
154
155 @Override
156 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
157 assert readResumptionListener != null;
158 if (!in.isReadable() || readResumptionListener.isSuspended()) {
159 return;
160 }
161 if (error) {
162 in.skipBytes(in.readableBytes());
163 return;
164 }
165 if (type == -1) {
166 int typeLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
167 if (in.readableBytes() < typeLen) {
168 return;
169 }
170 long localType = readVariableLengthInteger(in, typeLen);
171 if (Http3CodecUtils.isReservedHttp2FrameType(localType)) {
172
173 connectionError(ctx, Http3ErrorCode.H3_FRAME_UNEXPECTED,
174 "Reserved type for HTTP/2 received.", true);
175 return;
176 }
177 try {
178
179 validator.validate(localType, firstFrame);
180 } catch (Http3Exception e) {
181 connectionError(ctx, e, true);
182 return;
183 }
184 type = localType;
185 firstFrame = false;
186 if (!in.isReadable()) {
187 return;
188 }
189 }
190 if (payLoadLength == -1) {
191 int payloadLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
192 assert payloadLen <= 8;
193 if (in.readableBytes() < payloadLen) {
194 return;
195 }
196 long len = readVariableLengthInteger(in, payloadLen);
197 if (len > Integer.MAX_VALUE) {
198 connectionError(ctx, Http3ErrorCode.H3_EXCESSIVE_LOAD,
199 "Received an invalid frame len.", true);
200 return;
201 }
202 payLoadLength = (int) len;
203 }
204 int read = decodeFrame(ctx, type, payLoadLength, in, out);
205 if (read >= 0) {
206 if (read == payLoadLength) {
207 type = -1;
208 payLoadLength = -1;
209 } else {
210 payLoadLength -= read;
211 }
212 }
213 }
214
215 private static int skipBytes(ByteBuf in, int payLoadLength) {
216 in.skipBytes(payLoadLength);
217 return payLoadLength;
218 }
219
220 private int decodeFrame(ChannelHandlerContext ctx, long longType, int payLoadLength, ByteBuf in, List<Object> out) {
221 if (longType > Integer.MAX_VALUE && !Http3CodecUtils.isReservedFrameType(longType)) {
222 return skipBytes(in, payLoadLength);
223 }
224 int type = (int) longType;
225
226 switch (type) {
227 case HTTP3_DATA_FRAME_TYPE:
228
229
230 int readable = in.readableBytes();
231 if (readable == 0 && payLoadLength > 0) {
232 return 0;
233 }
234 int length = Math.min(readable, payLoadLength);
235 out.add(new DefaultHttp3DataFrame(in.readRetainedSlice(length)));
236 return length;
237 case HTTP3_HEADERS_FRAME_TYPE:
238
239
240 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
241
242
243
244 maxHeaderListSize, Http3ErrorCode.H3_EXCESSIVE_LOAD)) {
245 return 0;
246 }
247 assert qpackAttributes != null;
248 if (!qpackAttributes.dynamicTableDisabled() && !qpackAttributes.decoderStreamAvailable()) {
249 assert readResumptionListener != null;
250 readResumptionListener.suspended();
251 return 0;
252 }
253
254 Http3HeadersFrame headersFrame = new DefaultHttp3HeadersFrame();
255 if (decodeHeaders(ctx, headersFrame.headers(), in, payLoadLength, decodeState.receivedFinalHeaders())) {
256 out.add(headersFrame);
257 return payLoadLength;
258 }
259 return -1;
260 case HTTP3_CANCEL_PUSH_FRAME_TYPE:
261
262
263 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
264 HTTP3_CANCEL_PUSH_FRAME_MAX_LEN, Http3ErrorCode.H3_FRAME_ERROR)) {
265 return 0;
266 }
267 int pushIdLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
268 out.add(new DefaultHttp3CancelPushFrame(readVariableLengthInteger(in, pushIdLen)));
269 return payLoadLength;
270 case HTTP3_SETTINGS_FRAME_TYPE:
271
272
273
274
275 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength, HTTP3_SETTINGS_FRAME_MAX_LEN,
276 Http3ErrorCode.H3_EXCESSIVE_LOAD)) {
277 return 0;
278 }
279 Http3SettingsFrame settingsFrame = decodeSettings(ctx, in, payLoadLength);
280 if (settingsFrame != null) {
281 out.add(settingsFrame);
282 }
283 return payLoadLength;
284 case HTTP3_PUSH_PROMISE_FRAME_TYPE:
285
286
287 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
288
289
290
291 Math.max(maxHeaderListSize, maxHeaderListSize + 8), Http3ErrorCode.H3_EXCESSIVE_LOAD)) {
292 return 0;
293 }
294
295 assert qpackAttributes != null;
296 if (!qpackAttributes.dynamicTableDisabled() && !qpackAttributes.decoderStreamAvailable()) {
297 assert readResumptionListener != null;
298 readResumptionListener.suspended();
299 return 0;
300 }
301 int readerIdx = in.readerIndex();
302 int pushPromiseIdLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
303 Http3PushPromiseFrame pushPromiseFrame = new DefaultHttp3PushPromiseFrame(
304 readVariableLengthInteger(in, pushPromiseIdLen));
305 if (decodeHeaders(ctx, pushPromiseFrame.headers(), in, payLoadLength - pushPromiseIdLen, false)) {
306 out.add(pushPromiseFrame);
307 return payLoadLength;
308 }
309 in.readerIndex(readerIdx);
310 return -1;
311 case HTTP3_GO_AWAY_FRAME_TYPE:
312
313
314 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
315 HTTP3_GO_AWAY_FRAME_MAX_LEN, Http3ErrorCode.H3_FRAME_ERROR)) {
316 return 0;
317 }
318 int idLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
319 out.add(new DefaultHttp3GoAwayFrame(readVariableLengthInteger(in, idLen)));
320 return payLoadLength;
321 case HTTP3_MAX_PUSH_ID_FRAME_TYPE:
322
323
324 if (!enforceMaxPayloadLength(ctx, in, type, payLoadLength,
325 HTTP3_MAX_PUSH_ID_FRAME_MAX_LEN, Http3ErrorCode.H3_FRAME_ERROR)) {
326 return 0;
327 }
328 int pidLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
329 out.add(new DefaultHttp3MaxPushIdFrame(readVariableLengthInteger(in, pidLen)));
330 return payLoadLength;
331 default:
332 if (!Http3CodecUtils.isReservedFrameType(longType)) {
333 return skipBytes(in, payLoadLength);
334 }
335
336
337 if (in.readableBytes() < payLoadLength) {
338 return 0;
339 }
340 out.add(new DefaultHttp3UnknownFrame(longType, in.readRetainedSlice(payLoadLength)));
341 return payLoadLength;
342 }
343 }
344
345 private boolean enforceMaxPayloadLength(
346 ChannelHandlerContext ctx, ByteBuf in, int type, int payLoadLength,
347 long maxPayLoadLength, Http3ErrorCode error) {
348 if (payLoadLength > maxPayLoadLength) {
349 connectionError(ctx, error,
350 "Received an invalid frame len " + payLoadLength + " for frame of type " + type + '.', true);
351 return false;
352 }
353 return in.readableBytes() >= payLoadLength;
354 }
355
356 @Nullable
357 private Http3SettingsFrame decodeSettings(ChannelHandlerContext ctx, ByteBuf in, int payLoadLength) {
358 Http3SettingsFrame settingsFrame = new DefaultHttp3SettingsFrame(
359 new Http3Settings(nonStandardSettingsValidator));
360 while (payLoadLength > 0) {
361 int keyLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
362 long key = readVariableLengthInteger(in, keyLen);
363 if (Http3CodecUtils.isReservedHttp2Setting(key)) {
364
365
366 connectionError(ctx, Http3ErrorCode.H3_SETTINGS_ERROR,
367 "Received a settings key that is reserved for HTTP/2.", true);
368 return null;
369 }
370 payLoadLength -= keyLen;
371 int valueLen = numBytesForVariableLengthInteger(in.getByte(in.readerIndex()));
372 long value = readVariableLengthInteger(in, valueLen);
373 payLoadLength -= valueLen;
374
375 if (settingsFrame.put(key, value) != null) {
376
377
378 connectionError(ctx, Http3ErrorCode.H3_SETTINGS_ERROR,
379 "Received a duplicate settings key.", true);
380 return null;
381 }
382 }
383 return settingsFrame;
384 }
385
386
387
388
389
390
391
392
393
394
395
396
397
398 private boolean decodeHeaders(ChannelHandlerContext ctx, Http3Headers headers, ByteBuf in, int length,
399 boolean trailer) {
400 try {
401 Http3HeadersSink sink = new Http3HeadersSink(headers, maxHeaderListSize, true, trailer);
402 assert qpackAttributes != null;
403 assert readResumptionListener != null;
404 if (qpackDecoder.decode(qpackAttributes,
405 ((QuicStreamChannel) ctx.channel()).streamId(), in, length, sink, readResumptionListener)) {
406
407 sink.finish();
408 return true;
409 }
410 readResumptionListener.suspended();
411 } catch (Http3Exception e) {
412 connectionError(ctx, e.errorCode(), e.getMessage(), true);
413 } catch (QpackException e) {
414
415 connectionError(ctx, Http3ErrorCode.QPACK_DECOMPRESSION_FAILED,
416 "Decompression of header block failed.", true);
417 } catch (Http3HeadersValidationException e) {
418 error = true;
419 ctx.fireExceptionCaught(e);
420
421
422 Http3CodecUtils.streamError(ctx, Http3ErrorCode.H3_MESSAGE_ERROR);
423 }
424 return false;
425 }
426
427 @Override
428 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
429 assert qpackAttributes != null;
430 if (writeResumptionListener != null) {
431 writeResumptionListener.enqueue(msg, promise);
432 return;
433 }
434
435 if ((msg instanceof Http3HeadersFrame || msg instanceof Http3PushPromiseFrame) &&
436 !qpackAttributes.dynamicTableDisabled() && !qpackAttributes.encoderStreamAvailable()) {
437 writeResumptionListener = WriteResumptionListener.newListener(ctx, this);
438 writeResumptionListener.enqueue(msg, promise);
439 return;
440 }
441
442 write0(ctx, msg, promise);
443 }
444
445 private void write0(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
446 try {
447 if (msg instanceof Http3DataFrame) {
448 writeDataFrame(ctx, (Http3DataFrame) msg, promise);
449 } else if (msg instanceof Http3HeadersFrame) {
450 writeHeadersFrame(ctx, (Http3HeadersFrame) msg, promise);
451 } else if (msg instanceof Http3CancelPushFrame) {
452 writeCancelPushFrame(ctx, (Http3CancelPushFrame) msg, promise);
453 } else if (msg instanceof Http3SettingsFrame) {
454 writeSettingsFrame(ctx, (Http3SettingsFrame) msg, promise);
455 } else if (msg instanceof Http3PushPromiseFrame) {
456 writePushPromiseFrame(ctx, (Http3PushPromiseFrame) msg, promise);
457 } else if (msg instanceof Http3GoAwayFrame) {
458 writeGoAwayFrame(ctx, (Http3GoAwayFrame) msg, promise);
459 } else if (msg instanceof Http3MaxPushIdFrame) {
460 writeMaxPushIdFrame(ctx, (Http3MaxPushIdFrame) msg, promise);
461 } else if (msg instanceof Http3UnknownFrame) {
462 writeUnknownFrame(ctx, (Http3UnknownFrame) msg, promise);
463 } else {
464 unsupported(promise);
465 }
466 } finally {
467 ReferenceCountUtil.release(msg);
468 }
469 }
470
471 private static void writeDataFrame(
472 ChannelHandlerContext ctx, Http3DataFrame frame, ChannelPromise promise) {
473 ByteBuf out = ctx.alloc().directBuffer(16);
474 writeVariableLengthInteger(out, frame.type());
475 writeVariableLengthInteger(out, frame.content().readableBytes());
476 ByteBuf content = frame.content().retain();
477 ctx.write(Unpooled.wrappedUnmodifiableBuffer(out, content), promise);
478 }
479
480 private void writeHeadersFrame(ChannelHandlerContext ctx, Http3HeadersFrame frame, ChannelPromise promise) {
481 assert qpackAttributes != null;
482 final QuicStreamChannel channel = (QuicStreamChannel) ctx.channel();
483 writeDynamicFrame(ctx, frame.type(), frame, (f, out) -> {
484 qpackEncoder.encodeHeaders(qpackAttributes, out, ctx.alloc(), channel.streamId(), f.headers());
485 return true;
486 }, promise);
487 }
488
489 private static void writeCancelPushFrame(
490 ChannelHandlerContext ctx, Http3CancelPushFrame frame, ChannelPromise promise) {
491 writeFrameWithId(ctx, frame.type(), frame.id(), promise);
492 }
493
494 private static void writeSettingsFrame(
495 ChannelHandlerContext ctx, Http3SettingsFrame frame, ChannelPromise promise) {
496 writeDynamicFrame(ctx, frame.type(), frame, (f, out) -> {
497 for (Map.Entry<Long, Long> e : f) {
498 Long key = e.getKey();
499 if (Http3CodecUtils.isReservedHttp2Setting(key)) {
500 Http3Exception exception = new Http3Exception(Http3ErrorCode.H3_SETTINGS_ERROR,
501 "Received a settings key that is reserved for HTTP/2.");
502 promise.setFailure(exception);
503
504 Http3CodecUtils.connectionError(ctx, exception, false);
505 return false;
506 }
507 Long value = e.getValue();
508 int keyLen = numBytesForVariableLengthInteger(key);
509 int valueLen = numBytesForVariableLengthInteger(value);
510 writeVariableLengthInteger(out, key, keyLen);
511 writeVariableLengthInteger(out, value, valueLen);
512 }
513 return true;
514 }, promise);
515 }
516
517 private static <T extends Http3Frame> void writeDynamicFrame(ChannelHandlerContext ctx, long type, T frame,
518 BiFunction<T, ByteBuf, Boolean> writer,
519 ChannelPromise promise) {
520 ByteBuf out = ctx.alloc().directBuffer();
521 int initialWriterIndex = out.writerIndex();
522
523 int payloadStartIndex = initialWriterIndex + 16;
524 out.writerIndex(payloadStartIndex);
525
526 if (writer.apply(frame, out)) {
527 int finalWriterIndex = out.writerIndex();
528 int payloadLength = finalWriterIndex - payloadStartIndex;
529 int len = numBytesForVariableLengthInteger(payloadLength);
530 out.writerIndex(payloadStartIndex - len);
531 writeVariableLengthInteger(out, payloadLength, len);
532
533 int typeLength = numBytesForVariableLengthInteger(type);
534 int startIndex = payloadStartIndex - len - typeLength;
535 out.writerIndex(startIndex);
536 writeVariableLengthInteger(out, type, typeLength);
537
538 out.setIndex(startIndex, finalWriterIndex);
539 ctx.write(out, promise);
540 } else {
541
542 out.release();
543 }
544 }
545
546 private void writePushPromiseFrame(ChannelHandlerContext ctx, Http3PushPromiseFrame frame, ChannelPromise promise) {
547 assert qpackAttributes != null;
548 final QuicStreamChannel channel = (QuicStreamChannel) ctx.channel();
549 writeDynamicFrame(ctx, frame.type(), frame, (f, out) -> {
550 long id = f.id();
551 writeVariableLengthInteger(out, id);
552 qpackEncoder.encodeHeaders(qpackAttributes, out, ctx.alloc(), channel.streamId(), f.headers());
553 return true;
554 }, promise);
555 }
556
557 private static void writeGoAwayFrame(
558 ChannelHandlerContext ctx, Http3GoAwayFrame frame, ChannelPromise promise) {
559 writeFrameWithId(ctx, frame.type(), frame.id(), promise);
560 }
561
562 private static void writeMaxPushIdFrame(
563 ChannelHandlerContext ctx, Http3MaxPushIdFrame frame, ChannelPromise promise) {
564 writeFrameWithId(ctx, frame.type(), frame.id(), promise);
565 }
566
567 private static void writeFrameWithId(ChannelHandlerContext ctx, long type, long id, ChannelPromise promise) {
568 ByteBuf out = ctx.alloc().directBuffer(24);
569 writeVariableLengthInteger(out, type);
570 writeVariableLengthInteger(out, numBytesForVariableLengthInteger(id));
571 writeVariableLengthInteger(out, id);
572 ctx.write(out, promise);
573 }
574
575 private void writeUnknownFrame(
576 ChannelHandlerContext ctx, Http3UnknownFrame frame, ChannelPromise promise) {
577 long type = frame.type();
578 if (Http3CodecUtils.isReservedHttp2FrameType(type)) {
579 Http3Exception exception = new Http3Exception(Http3ErrorCode.H3_FRAME_UNEXPECTED,
580 "Reserved type for HTTP/2 send.");
581 promise.setFailure(exception);
582
583 connectionError(ctx, exception.errorCode(), exception.getMessage(), false);
584 return;
585 }
586 if (!Http3CodecUtils.isReservedFrameType(type)) {
587 Http3Exception exception = new Http3Exception(Http3ErrorCode.H3_FRAME_UNEXPECTED,
588 "Non reserved type for HTTP/3 send.");
589 promise.setFailure(exception);
590 return;
591 }
592 ByteBuf out = ctx.alloc().directBuffer();
593 writeVariableLengthInteger(out, type);
594 writeVariableLengthInteger(out, frame.content().readableBytes());
595 ByteBuf content = frame.content().retain();
596 ctx.write(Unpooled.wrappedUnmodifiableBuffer(out, content), promise);
597 }
598
599 private void initReadResumptionListenerIfRequired(ChannelHandlerContext ctx) {
600 if (readResumptionListener == null) {
601 readResumptionListener = new ReadResumptionListener(ctx, this);
602 }
603 }
604
605 private static void unsupported(ChannelPromise promise) {
606 promise.setFailure(new UnsupportedOperationException());
607 }
608
609 @Override
610 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
611 ctx.bind(localAddress, promise);
612 }
613
614 @Override
615 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
616 SocketAddress localAddress, ChannelPromise promise) {
617 ctx.connect(remoteAddress, localAddress, promise);
618 }
619
620 @Override
621 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
622 ctx.disconnect(promise);
623 }
624
625 @Override
626 public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
627 ctx.close(promise);
628 }
629
630 @Override
631 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
632 ctx.deregister(promise);
633 }
634
635 @Override
636 public void read(ChannelHandlerContext ctx) {
637 assert readResumptionListener != null;
638 if (readResumptionListener.readRequested()) {
639 ctx.read();
640 }
641 }
642
643 @Override
644 public void flush(ChannelHandlerContext ctx) {
645 if (writeResumptionListener != null) {
646 writeResumptionListener.enqueueFlush();
647 } else {
648 ctx.flush();
649 }
650 }
651
652 private static final class ReadResumptionListener
653 implements Runnable, GenericFutureListener<Future<? super QuicStreamChannel>> {
654 private static final int STATE_SUSPENDED = 0b1000_0000;
655 private static final int STATE_READ_PENDING = 0b0100_0000;
656 private static final int STATE_READ_COMPLETE_PENDING = 0b0010_0000;
657
658 private final ChannelHandlerContext ctx;
659 private final Http3FrameCodec codec;
660 private byte state;
661
662 ReadResumptionListener(ChannelHandlerContext ctx, Http3FrameCodec codec) {
663 this.ctx = ctx;
664 this.codec = codec;
665 assert codec.qpackAttributes != null;
666 if (!codec.qpackAttributes.dynamicTableDisabled() && !codec.qpackAttributes.decoderStreamAvailable()) {
667 codec.qpackAttributes.whenDecoderStreamAvailable(this);
668 }
669 }
670
671 void suspended() {
672 assert !codec.qpackAttributes.dynamicTableDisabled();
673 setState(STATE_SUSPENDED);
674 }
675
676 boolean readCompleted() {
677 if (hasState(STATE_SUSPENDED)) {
678 setState(STATE_READ_COMPLETE_PENDING);
679 return false;
680 }
681 return true;
682 }
683
684 boolean readRequested() {
685 if (hasState(STATE_SUSPENDED)) {
686 setState(STATE_READ_PENDING);
687 return false;
688 }
689 return true;
690 }
691
692 boolean isSuspended() {
693 return hasState(STATE_SUSPENDED);
694 }
695
696 @Override
697 public void operationComplete(Future<? super QuicStreamChannel> future) {
698 if (future.isSuccess()) {
699 resume();
700 } else {
701 ctx.fireExceptionCaught(future.cause());
702 }
703 }
704
705 @Override
706 public void run() {
707 resume();
708 }
709
710 private void resume() {
711 unsetState(STATE_SUSPENDED);
712 try {
713 codec.channelRead(ctx, Unpooled.EMPTY_BUFFER);
714 if (hasState(STATE_READ_COMPLETE_PENDING)) {
715 unsetState(STATE_READ_COMPLETE_PENDING);
716 codec.channelReadComplete(ctx);
717 }
718 if (hasState(STATE_READ_PENDING)) {
719 unsetState(STATE_READ_PENDING);
720 codec.read(ctx);
721 }
722 } catch (Exception e) {
723 ctx.fireExceptionCaught(e);
724 }
725 }
726
727 private void setState(int toSet) {
728 state |= toSet;
729 }
730
731 private boolean hasState(int toCheck) {
732 return (state & toCheck) == toCheck;
733 }
734
735 private void unsetState(int toUnset) {
736 state &= ~toUnset;
737 }
738 }
739
740 private static final class WriteResumptionListener
741 implements GenericFutureListener<Future<? super QuicStreamChannel>> {
742 private static final Object FLUSH = new Object();
743 private final PendingWriteQueue queue;
744 private final ChannelHandlerContext ctx;
745 private final Http3FrameCodec codec;
746
747 private WriteResumptionListener(ChannelHandlerContext ctx, Http3FrameCodec codec) {
748 this.ctx = ctx;
749 this.codec = codec;
750 queue = new PendingWriteQueue(ctx);
751 }
752
753 @Override
754 public void operationComplete(Future<? super QuicStreamChannel> future) {
755 drain();
756 }
757
758 void enqueue(Object msg, ChannelPromise promise) {
759 assert ctx.channel().eventLoop().inEventLoop();
760
761 ReferenceCountUtil.touch(msg);
762 queue.add(msg, promise);
763 }
764
765 void enqueueFlush() {
766 assert ctx.channel().eventLoop().inEventLoop();
767 queue.add(FLUSH, ctx.voidPromise());
768 }
769
770 void drain() {
771 assert ctx.channel().eventLoop().inEventLoop();
772 boolean flushSeen = false;
773 try {
774 for (;;) {
775 Object entry = queue.current();
776 if (entry == null) {
777 break;
778 }
779 if (entry == FLUSH) {
780 flushSeen = true;
781 queue.remove().trySuccess();
782 } else {
783
784 codec.write0(ctx, ReferenceCountUtil.retain(entry), queue.remove());
785 }
786 }
787
788
789 codec.writeResumptionListener = null;
790 } finally {
791 if (flushSeen) {
792 codec.flush(ctx);
793 }
794 }
795 }
796
797 static WriteResumptionListener newListener(ChannelHandlerContext ctx, Http3FrameCodec codec) {
798 WriteResumptionListener listener = new WriteResumptionListener(ctx, codec);
799 assert codec.qpackAttributes != null;
800 codec.qpackAttributes.whenEncoderStreamAvailable(listener);
801 return listener;
802 }
803 }
804
805
806
807
808 @FunctionalInterface
809 interface Http3FrameCodecFactory {
810
811
812
813
814
815
816
817
818 ChannelHandler newCodec(Http3FrameTypeValidator validator, Http3RequestStreamCodecState encodeState,
819 Http3RequestStreamCodecState decodeState,
820 Http3Settings.NonStandardHttp3SettingsValidator nonStandardSettingsValidator);
821 }
822 }