1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.http2;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.util.Resource;
20 import io.netty5.channel.Channel;
21 import io.netty5.channel.ChannelHandler;
22 import io.netty5.channel.ChannelHandlerContext;
23 import io.netty5.handler.codec.UnsupportedMessageTypeException;
24 import io.netty5.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
25 import io.netty5.handler.codec.http2.Http2Connection.PropertyKey;
26 import io.netty5.handler.codec.http2.Http2Stream.State;
27 import io.netty5.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
28 import io.netty5.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
29 import io.netty5.util.ReferenceCounted;
30 import io.netty5.util.collection.IntObjectHashMap;
31 import io.netty5.util.collection.IntObjectMap;
32 import io.netty5.util.concurrent.Future;
33 import io.netty5.util.concurrent.Promise;
34 import io.netty5.util.internal.UnstableApi;
35 import io.netty5.util.internal.logging.InternalLogger;
36 import io.netty5.util.internal.logging.InternalLoggerFactory;
37
38 import static io.netty5.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
39 import static io.netty5.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
40 import static io.netty5.handler.codec.http2.Http2Error.NO_ERROR;
41 import static io.netty5.util.internal.logging.InternalLogLevel.DEBUG;
42 import static java.nio.charset.StandardCharsets.US_ASCII;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 @UnstableApi
147 public class Http2FrameCodec extends Http2ConnectionHandler {
148
149 private static final InternalLogger LOG = InternalLoggerFactory.getInstance(Http2FrameCodec.class);
150
151 protected final PropertyKey streamKey;
152 private final PropertyKey upgradeKey;
153
154 private final Integer initialFlowControlWindowSize;
155
156 ChannelHandlerContext ctx;
157
158
159
160
161 private int numBufferedStreams;
162 private final IntObjectMap<DefaultHttp2FrameStream> frameStreamToInitializeMap =
163 new IntObjectHashMap<>(8);
164
165 Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings,
166 boolean decoupleCloseAndGoAway, boolean flushPreface) {
167 super(decoder, encoder, initialSettings, decoupleCloseAndGoAway, flushPreface);
168
169 decoder.frameListener(new FrameListener());
170 connection().addListener(new ConnectionListener());
171 connection().remote().flowController().listener(new Http2RemoteFlowControllerListener());
172 streamKey = connection().newKey();
173 upgradeKey = connection().newKey();
174 initialFlowControlWindowSize = initialSettings.initialWindowSize();
175 }
176
177
178
179
180 DefaultHttp2FrameStream newStream() {
181 return new DefaultHttp2FrameStream();
182 }
183
184
185
186
187
188
189 final void forEachActiveStream(final Http2FrameStreamVisitor streamVisitor) throws Http2Exception {
190 assert ctx.executor().inEventLoop();
191
192 if (connection().numActiveStreams() > 0) {
193 connection().forEachActiveStream(stream -> {
194 try {
195 return streamVisitor.visit(stream.getProperty(streamKey));
196 } catch (Throwable cause) {
197 onError(ctx, false, cause);
198 return false;
199 }
200 });
201 }
202 }
203
204
205
206
207
208
209 int numInitializingStreams() {
210 return frameStreamToInitializeMap.size();
211 }
212
213 @Override
214 public void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
215 super.handlerAdded0(ctx);
216 this.ctx = ctx;
217
218
219 Http2Connection connection = connection();
220 if (connection.isServer()) {
221 tryExpandConnectionFlowControlWindow(connection);
222 }
223 }
224
225 private void tryExpandConnectionFlowControlWindow(Http2Connection connection) throws Http2Exception {
226 if (initialFlowControlWindowSize != null) {
227
228
229 Http2Stream connectionStream = connection.connectionStream();
230 Http2LocalFlowController localFlowController = connection.local().flowController();
231 final int delta = initialFlowControlWindowSize - localFlowController.initialWindowSize(connectionStream);
232
233 if (delta > 0) {
234
235 localFlowController.incrementWindowSize(connectionStream, Math.max(delta << 1, delta));
236 flush(ctx);
237 }
238 }
239 }
240
241
242
243
244
245 @Override
246 public final void channelInboundEvent(final ChannelHandlerContext ctx, final Object evt) throws Exception {
247 if (evt == Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE) {
248
249 tryExpandConnectionFlowControlWindow(connection());
250
251
252
253
254 ctx.executor().execute(() -> ctx.fireChannelInboundEvent(evt));
255 } else if (evt instanceof UpgradeEvent) {
256 try (UpgradeEvent upgrade = (UpgradeEvent) evt) {
257
258 ctx.fireChannelInboundEvent(upgrade.copy());
259 Http2Stream stream = connection().stream(HTTP_UPGRADE_STREAM_ID);
260 if (stream.getProperty(streamKey) == null) {
261
262
263
264 onStreamActive0(stream);
265 }
266 upgrade.upgradeRequest().headers().setInt(
267 HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), HTTP_UPGRADE_STREAM_ID);
268 stream.setProperty(upgradeKey, true);
269 InboundHttpToHttp2Adapter.handle(
270 ctx, connection(), decoder().frameListener(), upgrade.upgradeRequest());
271 }
272 } else {
273 ctx.fireChannelInboundEvent(evt);
274 }
275 }
276
277
278
279
280
281 @Override
282 public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
283 if (msg instanceof Http2DataFrame) {
284 Http2DataFrame dataFrame = (Http2DataFrame) msg;
285 return encoder().writeData(ctx, dataFrame.stream().id(), dataFrame.content(),
286 dataFrame.padding(), dataFrame.isEndStream());
287 } else if (msg instanceof Http2HeadersFrame) {
288 return writeHeadersFrame(ctx, (Http2HeadersFrame) msg);
289 } else if (msg instanceof Http2WindowUpdateFrame) {
290 Http2WindowUpdateFrame frame = (Http2WindowUpdateFrame) msg;
291 Http2FrameStream frameStream = frame.stream();
292
293
294 try {
295 if (frameStream == null) {
296 increaseInitialConnectionWindow(frame.windowSizeIncrement());
297 } else {
298 consumeBytes(frameStream.id(), frame.windowSizeIncrement());
299 }
300 return ctx.newSucceededFuture();
301 } catch (Throwable t) {
302 return ctx.newFailedFuture(t);
303 }
304 } else if (msg instanceof Http2ResetFrame) {
305 Http2ResetFrame rstFrame = (Http2ResetFrame) msg;
306 int id = rstFrame.stream().id();
307
308
309 if (connection().streamMayHaveExisted(id)) {
310 return encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode());
311 } else {
312 Resource.dispose(rstFrame);
313 return ctx.newFailedFuture(Http2Exception.streamError(
314 rstFrame.stream().id(), Http2Error.PROTOCOL_ERROR, "Stream never existed"));
315 }
316 } else if (msg instanceof Http2PingFrame) {
317 Http2PingFrame frame = (Http2PingFrame) msg;
318 return encoder().writePing(ctx, frame.ack(), frame.content());
319 } else if (msg instanceof Http2SettingsFrame) {
320 return encoder().writeSettings(ctx, ((Http2SettingsFrame) msg).settings());
321 } else if (msg instanceof Http2SettingsAckFrame) {
322
323
324 return encoder().writeSettingsAck(ctx);
325 } else if (msg instanceof Http2GoAwayFrame) {
326 return writeGoAwayFrame(ctx, (Http2GoAwayFrame) msg);
327 } else if (msg instanceof Http2PushPromiseFrame) {
328 Http2PushPromiseFrame pushPromiseFrame = (Http2PushPromiseFrame) msg;
329 return writePushPromise(ctx, pushPromiseFrame);
330 } else if (msg instanceof Http2PriorityFrame) {
331 Http2PriorityFrame priorityFrame = (Http2PriorityFrame) msg;
332 return encoder().writePriority(ctx, priorityFrame.stream().id(), priorityFrame.streamDependency(),
333 priorityFrame.weight(), priorityFrame.exclusive());
334 } else if (msg instanceof Http2UnknownFrame) {
335 Http2UnknownFrame unknownFrame = (Http2UnknownFrame) msg;
336 return encoder().writeFrame(ctx, unknownFrame.frameType(), unknownFrame.stream().id(),
337 unknownFrame.flags(), unknownFrame.content());
338 } else if (!(msg instanceof Http2Frame)) {
339 return ctx.write(msg);
340 } else {
341 Resource.dispose(msg);
342 return ctx.newFailedFuture(new UnsupportedMessageTypeException(msg));
343 }
344 }
345
346 private void increaseInitialConnectionWindow(int deltaBytes) throws Http2Exception {
347
348 connection().local().flowController().incrementWindowSize(connection().connectionStream(), deltaBytes);
349 }
350
351 final boolean consumeBytes(int streamId, int bytes) throws Http2Exception {
352 Http2Stream stream = connection().stream(streamId);
353
354
355 if (stream != null && streamId == HTTP_UPGRADE_STREAM_ID) {
356 Boolean upgraded = stream.getProperty(upgradeKey);
357 if (Boolean.TRUE.equals(upgraded)) {
358 return false;
359 }
360 }
361
362 return connection().local().flowController().consumeBytes(stream, bytes);
363 }
364
365 private Future<Void> writeGoAwayFrame(ChannelHandlerContext ctx, Http2GoAwayFrame frame) {
366 if (frame.lastStreamId() > -1) {
367 frame.close();
368 return ctx.newFailedFuture(new IllegalArgumentException("Last stream id must not be set on GOAWAY frame"));
369 }
370
371 int lastStreamCreated = connection().remote().lastStreamCreated();
372 long lastStreamId = lastStreamCreated + (long) frame.extraStreamIds() * 2;
373
374 if (lastStreamId > Integer.MAX_VALUE) {
375 lastStreamId = Integer.MAX_VALUE;
376 }
377 return goAway(ctx, (int) lastStreamId, frame.errorCode(), frame.content());
378 }
379
380 private Future<Void> writeHeadersFrame(final ChannelHandlerContext ctx, Http2HeadersFrame headersFrame) {
381
382 if (isStreamIdValid(headersFrame.stream().id())) {
383 return encoder().writeHeaders(ctx, headersFrame.stream().id(), headersFrame.headers(),
384 headersFrame.padding(), headersFrame.isEndStream());
385 } else {
386 Future<Void> future = initializeNewStream(ctx, (DefaultHttp2FrameStream) headersFrame.stream());
387 if (future == null) {
388 final int streamId = headersFrame.stream().id();
389
390 future = encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
391 headersFrame.isEndStream());
392
393 if (!future.isDone()) {
394 numBufferedStreams++;
395
396
397 future.addListener(channelFuture -> {
398 numBufferedStreams--;
399
400 handleHeaderFuture(channelFuture, streamId);
401 });
402 } else {
403 handleHeaderFuture(future, streamId);
404 }
405 }
406 return future;
407 }
408 }
409
410 private Future<Void> writePushPromise(final ChannelHandlerContext ctx, Http2PushPromiseFrame pushPromiseFrame) {
411 if (isStreamIdValid(pushPromiseFrame.pushStream().id())) {
412 return encoder().writePushPromise(ctx, pushPromiseFrame.stream().id(), pushPromiseFrame.pushStream().id(),
413 pushPromiseFrame.http2Headers(), pushPromiseFrame.padding());
414 } else {
415 Future<Void> future = initializeNewStream(ctx, (DefaultHttp2FrameStream) pushPromiseFrame.pushStream());
416 if (future == null) {
417 final int streamId = pushPromiseFrame.stream().id();
418 future = encoder().writePushPromise(ctx, streamId, pushPromiseFrame.pushStream().id(),
419 pushPromiseFrame.http2Headers(), pushPromiseFrame.padding());
420
421 if (future.isDone()) {
422 handleHeaderFuture(future, streamId);
423 } else {
424 numBufferedStreams++;
425
426
427 future.addListener(f -> {
428 numBufferedStreams--;
429 handleHeaderFuture(f, streamId);
430 });
431 }
432 return future;
433 }
434 return future;
435 }
436 }
437
438 private Future<Void> initializeNewStream(ChannelHandlerContext ctx, DefaultHttp2FrameStream http2FrameStream) {
439 final Http2Connection connection = connection();
440 final int streamId = connection.local().incrementAndGetNextStreamId();
441 if (streamId < 0) {
442 Future<Void> f = ctx.newFailedFuture(new Http2NoMoreStreamIdsException());
443
444
445
446 onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(connection.isServer() ? Integer.MAX_VALUE :
447 Integer.MAX_VALUE - 1, NO_ERROR.code(),
448 ctx.bufferAllocator().copyOf("Stream IDs exhausted on local stream creation", US_ASCII).send()));
449
450 return f;
451 }
452 http2FrameStream.id = streamId;
453
454
455
456
457
458
459 Object old = frameStreamToInitializeMap.put(streamId, http2FrameStream);
460
461
462 assert old == null;
463 return null;
464 }
465
466 private void handleHeaderFuture(Future<?> channelFuture, int streamId) {
467 if (channelFuture.isFailed()) {
468 frameStreamToInitializeMap.remove(streamId);
469 }
470 }
471
472 private void onStreamActive0(Http2Stream stream) {
473 if (stream.id() != HTTP_UPGRADE_STREAM_ID &&
474 connection().local().isValidStreamId(stream.id())) {
475 return;
476 }
477
478 DefaultHttp2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
479 onHttp2StreamStateChanged(ctx, stream2);
480 }
481
482 private final class ConnectionListener extends Http2ConnectionAdapter {
483 @Override
484 public void onStreamAdded(Http2Stream stream) {
485 DefaultHttp2FrameStream frameStream = frameStreamToInitializeMap.remove(stream.id());
486
487 if (frameStream != null) {
488 frameStream.setStreamAndProperty(streamKey, stream);
489 }
490 }
491
492 @Override
493 public void onStreamActive(Http2Stream stream) {
494 onStreamActive0(stream);
495 }
496
497 @Override
498 public void onStreamClosed(Http2Stream stream) {
499 onHttp2StreamStateChanged0(stream);
500 }
501
502 @Override
503 public void onStreamHalfClosed(Http2Stream stream) {
504 onHttp2StreamStateChanged0(stream);
505 }
506
507 private void onHttp2StreamStateChanged0(Http2Stream stream) {
508 DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
509 if (stream2 != null) {
510 onHttp2StreamStateChanged(ctx, stream2);
511 }
512 }
513 }
514
515 @Override
516 protected void onConnectionError(
517 ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex) {
518 if (!outbound) {
519
520
521
522
523 ctx.fireChannelExceptionCaught(cause);
524 }
525 super.onConnectionError(ctx, outbound, cause, http2Ex);
526 }
527
528
529
530
531
532 @Override
533 protected final void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
534 Http2Exception.StreamException streamException) {
535 int streamId = streamException.streamId();
536 Http2Stream connectionStream = connection().stream(streamId);
537 if (connectionStream == null) {
538 onHttp2UnknownStreamError(ctx, cause, streamException);
539
540 super.onStreamError(ctx, outbound, cause, streamException);
541 return;
542 }
543
544 Http2FrameStream stream = connectionStream.getProperty(streamKey);
545 if (stream == null) {
546 LOG.warn("Stream exception thrown without stream object attached.", cause);
547
548 super.onStreamError(ctx, outbound, cause, streamException);
549 return;
550 }
551
552 if (!outbound) {
553
554 onHttp2FrameStreamException(ctx, new Http2FrameStreamException(stream, streamException.error(), cause));
555 }
556 }
557
558 private static void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx,
559 Throwable cause, Http2Exception.StreamException streamException) {
560
561
562
563
564
565 LOG.log(DEBUG, "Stream exception thrown for unknown stream {}.", streamException.streamId(), cause);
566 }
567
568 @Override
569 protected final boolean isGracefulShutdownComplete() {
570 return super.isGracefulShutdownComplete() && numBufferedStreams == 0;
571 }
572
573 private final class FrameListener implements Http2FrameListener {
574
575 @Override
576 public void onUnknownFrame(
577 ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, Buffer payload) {
578 if (streamId == 0) {
579
580 return;
581 }
582 onHttp2Frame(ctx, new DefaultHttp2UnknownFrame(frameType, flags, payload)
583 .stream(requireStream(streamId)).copy());
584 }
585
586 @Override
587 public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
588 onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings));
589 }
590
591 @Override
592 public void onPingRead(ChannelHandlerContext ctx, long data) {
593 onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false));
594 }
595
596 @Override
597 public void onPingAckRead(ChannelHandlerContext ctx, long data) {
598 onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true));
599 }
600
601 @Override
602 public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
603 onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(requireStream(streamId)));
604 }
605
606 @Override
607 public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
608 if (streamId == 0) {
609
610 return;
611 }
612 onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(requireStream(streamId)));
613 }
614
615 @Override
616 public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
617 Http2Headers headers, int streamDependency, short weight, boolean
618 exclusive, int padding, boolean endStream) {
619 onHeadersRead(ctx, streamId, headers, padding, endStream);
620 }
621
622 @Override
623 public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
624 int padding, boolean endOfStream) {
625 onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding)
626 .stream(requireStream(streamId)));
627 }
628
629 @Override
630 public int onDataRead(ChannelHandlerContext ctx, int streamId, Buffer data, int padding,
631 boolean endOfStream) {
632 onHttp2Frame(ctx, new DefaultHttp2DataFrame(data.send(), endOfStream, padding)
633 .stream(requireStream(streamId)));
634
635 return 0;
636 }
637
638 @Override
639 public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, Buffer debugData) {
640 onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData.send()));
641 }
642
643 @Override
644 public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
645 short weight, boolean exclusive) {
646
647 Http2Stream stream = connection().stream(streamId);
648 if (stream == null) {
649
650 return;
651 }
652 onHttp2Frame(ctx, new DefaultHttp2PriorityFrame(streamDependency, weight, exclusive)
653 .stream(requireStream(streamId)));
654 }
655
656 @Override
657 public void onSettingsAckRead(ChannelHandlerContext ctx) {
658 onHttp2Frame(ctx, Http2SettingsAckFrame.INSTANCE);
659 }
660
661 @Override
662 public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
663 Http2Headers headers, int padding) {
664 onHttp2Frame(ctx, new DefaultHttp2PushPromiseFrame(headers, padding, promisedStreamId)
665 .pushStream(new DefaultHttp2FrameStream()
666 .setStreamAndProperty(streamKey, connection().stream(promisedStreamId)))
667 .stream(requireStream(streamId)));
668 }
669
670 private Http2FrameStream requireStream(int streamId) {
671 Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey);
672 if (stream == null) {
673 throw new IllegalStateException("Stream object required for identifier: " + streamId);
674 }
675 return stream;
676 }
677 }
678
679 private void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream,
680 @SuppressWarnings("unused") boolean writable) {
681 ctx.fireChannelInboundEvent(stream.writabilityChanged);
682 }
683
684 void onHttp2StreamStateChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
685 ctx.fireChannelInboundEvent(stream.stateChanged);
686 }
687
688 void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
689 ctx.fireChannelRead(frame);
690 }
691
692 void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
693 ctx.fireChannelExceptionCaught(cause);
694 }
695
696 private final class Http2RemoteFlowControllerListener implements Http2RemoteFlowController.Listener {
697 @Override
698 public void writabilityChanged(Http2Stream stream) {
699 DefaultHttp2FrameStream frameStream = stream.getProperty(streamKey);
700 if (frameStream == null) {
701 return;
702 }
703 onHttp2StreamWritabilityChanged(
704 ctx, frameStream, connection().remote().flowController().isWritable(stream));
705 }
706 }
707
708
709
710
711
712 static class DefaultHttp2FrameStream implements Http2FrameStream {
713
714 private volatile int id = -1;
715 private volatile Http2Stream stream;
716
717 final Http2FrameStreamEvent stateChanged = Http2FrameStreamEvent.stateChanged(this);
718 final Http2FrameStreamEvent writabilityChanged = Http2FrameStreamEvent.writabilityChanged(this);
719
720 Channel attachment;
721
722 DefaultHttp2FrameStream setStreamAndProperty(PropertyKey streamKey, Http2Stream stream) {
723 assert id == -1 || stream.id() == id;
724 this.stream = stream;
725 stream.setProperty(streamKey, this);
726 return this;
727 }
728
729 @Override
730 public int id() {
731 Http2Stream stream = this.stream;
732 return stream == null ? id : stream.id();
733 }
734
735 @Override
736 public State state() {
737 Http2Stream stream = this.stream;
738 return stream == null ? State.IDLE : stream.state();
739 }
740
741 @Override
742 public String toString() {
743 return String.valueOf(id());
744 }
745 }
746 }