1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelInboundHandler;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.handler.codec.UnsupportedMessageTypeException;
26 import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
27 import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
28 import io.netty.handler.codec.http2.Http2Stream.State;
29 import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
30 import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
31 import io.netty.util.ReferenceCountUtil;
32 import io.netty.util.ReferenceCounted;
33 import io.netty.util.collection.IntObjectHashMap;
34 import io.netty.util.collection.IntObjectMap;
35 import io.netty.util.internal.logging.InternalLogger;
36 import io.netty.util.internal.logging.InternalLoggerFactory;
37
38 import static io.netty.buffer.ByteBufUtil.writeAscii;
39 import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
40 import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
41 import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
42 import static io.netty.util.internal.logging.InternalLogLevel.DEBUG;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144 public class Http2FrameCodec extends Http2ConnectionHandler {
145
146 private static final InternalLogger LOG = InternalLoggerFactory.getInstance(Http2FrameCodec.class);
147
148 private static final Class<?>[] SUPPORTED_MESSAGES = new Class[] {
149 Http2DataFrame.class, Http2HeadersFrame.class, Http2WindowUpdateFrame.class, Http2ResetFrame.class,
150 Http2PingFrame.class, Http2SettingsFrame.class, Http2SettingsAckFrame.class, Http2GoAwayFrame.class,
151 Http2PushPromiseFrame.class, Http2PriorityFrame.class, Http2UnknownFrame.class };
152
153 protected final PropertyKey streamKey;
154 private final PropertyKey upgradeKey;
155
156 private final Integer initialFlowControlWindowSize;
157
158 ChannelHandlerContext ctx;
159
160
161
162
163 private int numBufferedStreams;
164 private final IntObjectMap<DefaultHttp2FrameStream> frameStreamToInitializeMap =
165 new IntObjectHashMap<DefaultHttp2FrameStream>(8);
166
167 protected Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder,
168 Http2Settings initialSettings, boolean decoupleCloseAndGoAway, boolean flushPreface) {
169 super(decoder, encoder, initialSettings, decoupleCloseAndGoAway, flushPreface);
170
171 decoder.frameListener(new FrameListener());
172 connection().addListener(new ConnectionListener());
173 connection().remote().flowController().listener(new Http2RemoteFlowControllerListener());
174 streamKey = connection().newKey();
175 upgradeKey = connection().newKey();
176 initialFlowControlWindowSize = initialSettings.initialWindowSize();
177 }
178
179
180
181
182 DefaultHttp2FrameStream newStream() {
183 return new DefaultHttp2FrameStream();
184 }
185
186
187
188
189
190
191 final void forEachActiveStream(final Http2FrameStreamVisitor streamVisitor) throws Http2Exception {
192 assert ctx.executor().inEventLoop();
193 if (connection().numActiveStreams() > 0) {
194 connection().forEachActiveStream(new Http2StreamVisitor() {
195 @Override
196 public boolean visit(Http2Stream stream) {
197 try {
198 return streamVisitor.visit((Http2FrameStream) stream.getProperty(streamKey));
199 } catch (Throwable cause) {
200 onError(ctx, false, cause);
201 return false;
202 }
203 }
204 });
205 }
206 }
207
208
209
210
211
212
213 int numInitializingStreams() {
214 return frameStreamToInitializeMap.size();
215 }
216
217 @Override
218 public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
219 this.ctx = ctx;
220 super.handlerAdded(ctx);
221 handlerAdded0(ctx);
222
223
224 Http2Connection connection = connection();
225 if (connection.isServer()) {
226 tryExpandConnectionFlowControlWindow(connection);
227 }
228 }
229
230 private void tryExpandConnectionFlowControlWindow(Http2Connection connection) throws Http2Exception {
231 if (initialFlowControlWindowSize != null) {
232
233
234 Http2Stream connectionStream = connection.connectionStream();
235 Http2LocalFlowController localFlowController = connection.local().flowController();
236 final int delta = initialFlowControlWindowSize - localFlowController.initialWindowSize(connectionStream);
237
238 if (delta > 0) {
239
240 localFlowController.incrementWindowSize(connectionStream, Math.max(delta << 1, delta));
241 flush(ctx);
242 }
243 }
244 }
245
246 void handlerAdded0(@SuppressWarnings("unsed") ChannelHandlerContext ctx) throws Exception {
247
248 }
249
250
251
252
253
254 @Override
255 public final void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
256 if (evt == Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE) {
257
258 tryExpandConnectionFlowControlWindow(connection());
259
260
261
262
263 ctx.executor().execute(new Runnable() {
264 @Override
265 public void run() {
266 ctx.fireUserEventTriggered(evt);
267 }
268 });
269 } else if (evt instanceof UpgradeEvent) {
270 UpgradeEvent upgrade = (UpgradeEvent) evt;
271 try {
272 onUpgradeEvent(ctx, upgrade.retain());
273 Http2Stream stream = connection().stream(HTTP_UPGRADE_STREAM_ID);
274 if (stream.getProperty(streamKey) == null) {
275
276
277
278 onStreamActive0(stream);
279 }
280 upgrade.upgradeRequest().headers().setInt(
281 HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), HTTP_UPGRADE_STREAM_ID);
282 stream.setProperty(upgradeKey, true);
283 InboundHttpToHttp2Adapter.handle(
284 ctx, connection(), decoder().frameListener(), upgrade.upgradeRequest().retain());
285 } finally {
286 upgrade.release();
287 }
288 } else {
289 onUserEventTriggered(ctx, evt);
290 ctx.fireUserEventTriggered(evt);
291 }
292 }
293
294 void onUserEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
295
296 }
297
298
299
300
301
302 @Override
303 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
304 if (msg instanceof Http2DataFrame) {
305 Http2DataFrame dataFrame = (Http2DataFrame) msg;
306 encoder().writeData(ctx, dataFrame.stream().id(), dataFrame.content(),
307 dataFrame.padding(), dataFrame.isEndStream(), promise);
308 } else if (msg instanceof Http2HeadersFrame) {
309 writeHeadersFrame(ctx, (Http2HeadersFrame) msg, promise);
310 } else if (msg instanceof Http2WindowUpdateFrame) {
311 Http2WindowUpdateFrame frame = (Http2WindowUpdateFrame) msg;
312 Http2FrameStream frameStream = frame.stream();
313
314
315 try {
316 if (frameStream == null) {
317 increaseInitialConnectionWindow(frame.windowSizeIncrement());
318 } else {
319 consumeBytes(frameStream.id(), frame.windowSizeIncrement());
320 }
321 promise.setSuccess();
322 } catch (Throwable t) {
323 promise.setFailure(t);
324 }
325 } else if (msg instanceof Http2ResetFrame) {
326 Http2ResetFrame rstFrame = (Http2ResetFrame) msg;
327 int id = rstFrame.stream().id();
328
329
330 if (connection().streamMayHaveExisted(id)) {
331 encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode(), promise);
332 } else {
333 ReferenceCountUtil.release(rstFrame);
334 promise.setFailure(Http2Exception.streamError(
335 rstFrame.stream().id(), Http2Error.PROTOCOL_ERROR, "Stream never existed"));
336 }
337 } else if (msg instanceof Http2PingFrame) {
338 Http2PingFrame frame = (Http2PingFrame) msg;
339 encoder().writePing(ctx, frame.ack(), frame.content(), promise);
340 } else if (msg instanceof Http2SettingsFrame) {
341 encoder().writeSettings(ctx, ((Http2SettingsFrame) msg).settings(), promise);
342 } else if (msg instanceof Http2SettingsAckFrame) {
343
344
345 encoder().writeSettingsAck(ctx, promise);
346 } else if (msg instanceof Http2GoAwayFrame) {
347 writeGoAwayFrame(ctx, (Http2GoAwayFrame) msg, promise);
348 } else if (msg instanceof Http2PushPromiseFrame) {
349 Http2PushPromiseFrame pushPromiseFrame = (Http2PushPromiseFrame) msg;
350 writePushPromise(ctx, pushPromiseFrame, promise);
351 } else if (msg instanceof Http2PriorityFrame) {
352 Http2PriorityFrame priorityFrame = (Http2PriorityFrame) msg;
353 encoder().writePriority(ctx, priorityFrame.stream().id(), priorityFrame.streamDependency(),
354 priorityFrame.weight(), priorityFrame.exclusive(), promise);
355 } else if (msg instanceof Http2UnknownFrame) {
356 Http2UnknownFrame unknownFrame = (Http2UnknownFrame) msg;
357 encoder().writeFrame(ctx, unknownFrame.frameType(), unknownFrame.stream().id(),
358 unknownFrame.flags(), unknownFrame.content(), promise);
359 } else if (!(msg instanceof Http2Frame)) {
360 ctx.write(msg, promise);
361 } else {
362 ReferenceCountUtil.release(msg);
363 throw new UnsupportedMessageTypeException(msg, SUPPORTED_MESSAGES);
364 }
365 }
366
367 private void increaseInitialConnectionWindow(int deltaBytes) throws Http2Exception {
368
369 connection().local().flowController().incrementWindowSize(connection().connectionStream(), deltaBytes);
370 }
371
372 final boolean consumeBytes(int streamId, int bytes) throws Http2Exception {
373 Http2Stream stream = connection().stream(streamId);
374
375
376 if (stream != null && streamId == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
377 Boolean upgraded = stream.getProperty(upgradeKey);
378 if (Boolean.TRUE.equals(upgraded)) {
379 return false;
380 }
381 }
382
383 return connection().local().flowController().consumeBytes(stream, bytes);
384 }
385
386 private void writeGoAwayFrame(ChannelHandlerContext ctx, Http2GoAwayFrame frame, ChannelPromise promise) {
387 if (frame.lastStreamId() > -1) {
388 frame.release();
389 throw new IllegalArgumentException("Last stream id must not be set on GOAWAY frame");
390 }
391
392 int lastStreamCreated = connection().remote().lastStreamCreated();
393 long lastStreamId = lastStreamCreated + ((long) frame.extraStreamIds()) * 2;
394
395 if (lastStreamId > Integer.MAX_VALUE) {
396 lastStreamId = Integer.MAX_VALUE;
397 }
398 goAway(ctx, (int) lastStreamId, frame.errorCode(), frame.content(), promise);
399 }
400
401 private void writeHeadersFrame(final ChannelHandlerContext ctx, Http2HeadersFrame headersFrame,
402 ChannelPromise promise) {
403
404 if (isStreamIdValid(headersFrame.stream().id())) {
405 encoder().writeHeaders(ctx, headersFrame.stream().id(), headersFrame.headers(), headersFrame.padding(),
406 headersFrame.isEndStream(), promise);
407 } else if (initializeNewStream(ctx, (DefaultHttp2FrameStream) headersFrame.stream(), promise)) {
408 promise = promise.unvoid();
409
410 final int streamId = headersFrame.stream().id();
411
412 encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
413 headersFrame.isEndStream(), promise);
414
415 if (!promise.isDone()) {
416 numBufferedStreams++;
417
418
419 promise.addListener((ChannelFutureListener) channelFuture -> {
420 numBufferedStreams--;
421 handleHeaderFuture(channelFuture, streamId);
422 });
423 } else {
424 handleHeaderFuture(promise, streamId);
425 }
426 }
427 }
428
429 private void writePushPromise(final ChannelHandlerContext ctx, Http2PushPromiseFrame pushPromiseFrame,
430 final ChannelPromise promise) {
431 if (isStreamIdValid(pushPromiseFrame.pushStream().id())) {
432 encoder().writePushPromise(ctx, pushPromiseFrame.stream().id(), pushPromiseFrame.pushStream().id(),
433 pushPromiseFrame.http2Headers(), pushPromiseFrame.padding(), promise);
434 } else if (initializeNewStream(ctx, (DefaultHttp2FrameStream) pushPromiseFrame.pushStream(), promise)) {
435 final int streamId = pushPromiseFrame.stream().id();
436 encoder().writePushPromise(ctx, streamId, pushPromiseFrame.pushStream().id(),
437 pushPromiseFrame.http2Headers(), pushPromiseFrame.padding(), promise);
438
439 if (promise.isDone()) {
440 handleHeaderFuture(promise, streamId);
441 } else {
442 numBufferedStreams++;
443
444
445 promise.addListener((ChannelFutureListener) channelFuture -> {
446 numBufferedStreams--;
447 handleHeaderFuture(channelFuture, streamId);
448 });
449 }
450 }
451 }
452
453 private boolean initializeNewStream(ChannelHandlerContext ctx, DefaultHttp2FrameStream http2FrameStream,
454 ChannelPromise promise) {
455 final Http2Connection connection = connection();
456 final int streamId = connection.local().incrementAndGetNextStreamId();
457 if (streamId < 0) {
458 promise.setFailure(new Http2NoMoreStreamIdsException());
459
460
461
462 onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(connection.isServer() ? Integer.MAX_VALUE :
463 Integer.MAX_VALUE - 1, NO_ERROR.code(),
464 writeAscii(ctx.alloc(), "Stream IDs exhausted on local stream creation")));
465
466 return false;
467 }
468 http2FrameStream.id = streamId;
469
470
471
472
473
474
475 Object old = frameStreamToInitializeMap.put(streamId, http2FrameStream);
476
477
478 assert old == null;
479 return true;
480 }
481
482 private void handleHeaderFuture(ChannelFuture channelFuture, int streamId) {
483 if (!channelFuture.isSuccess()) {
484 frameStreamToInitializeMap.remove(streamId);
485 }
486 }
487
488 private void onStreamActive0(Http2Stream stream) {
489 if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID &&
490 connection().local().isValidStreamId(stream.id())) {
491 return;
492 }
493
494 DefaultHttp2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
495 onHttp2StreamStateChanged(ctx, stream2);
496 }
497
498 private final class ConnectionListener extends Http2ConnectionAdapter {
499 @Override
500 public void onStreamAdded(Http2Stream stream) {
501 DefaultHttp2FrameStream frameStream = frameStreamToInitializeMap.remove(stream.id());
502
503 if (frameStream != null) {
504 frameStream.setStreamAndProperty(streamKey, stream);
505 }
506 }
507
508 @Override
509 public void onStreamActive(Http2Stream stream) {
510 onStreamActive0(stream);
511 }
512
513 @Override
514 public void onStreamClosed(Http2Stream stream) {
515 onHttp2StreamStateChanged0(stream);
516 }
517
518 @Override
519 public void onStreamHalfClosed(Http2Stream stream) {
520 onHttp2StreamStateChanged0(stream);
521 }
522
523 private void onHttp2StreamStateChanged0(Http2Stream stream) {
524 DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
525 if (stream2 != null) {
526 onHttp2StreamStateChanged(ctx, stream2);
527 }
528 }
529 }
530
531 @Override
532 protected void onConnectionError(
533 ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex) {
534 if (!outbound) {
535
536
537
538
539 ctx.fireExceptionCaught(cause);
540 }
541 super.onConnectionError(ctx, outbound, cause, http2Ex);
542 }
543
544
545
546
547
548 @Override
549 protected final void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
550 Http2Exception.StreamException streamException) {
551 int streamId = streamException.streamId();
552 Http2Stream connectionStream = connection().stream(streamId);
553 if (connectionStream == null) {
554 onHttp2UnknownStreamError(ctx, cause, streamException);
555
556 super.onStreamError(ctx, outbound, cause, streamException);
557 return;
558 }
559
560 Http2FrameStream stream = connectionStream.getProperty(streamKey);
561 if (stream == null) {
562 LOG.warn("{} Stream exception thrown without stream object attached.", ctx.channel(), cause);
563
564 super.onStreamError(ctx, outbound, cause, streamException);
565 return;
566 }
567
568 if (!outbound) {
569
570 onHttp2FrameStreamException(ctx, new Http2FrameStreamException(stream, streamException.error(), cause));
571 }
572 }
573
574 private static void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx,
575 Throwable cause, Http2Exception.StreamException streamException) {
576
577
578
579
580
581 LOG.log(DEBUG, "{} Stream exception thrown for unknown stream {}.",
582 ctx.channel(), streamException.streamId(), cause);
583 }
584
585 @Override
586 protected final boolean isGracefulShutdownComplete() {
587 return super.isGracefulShutdownComplete() && numBufferedStreams == 0;
588 }
589
590 private final class FrameListener implements Http2FrameListener {
591
592 @Override
593 public void onUnknownFrame(
594 ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {
595 if (streamId == 0) {
596
597 return;
598 }
599 Http2FrameStream stream = requireStream(streamId);
600 onHttp2Frame(ctx, newHttp2UnknownFrame(frameType, streamId, flags, payload.retain()).stream(stream));
601 }
602
603 @Override
604 public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
605 onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings));
606 }
607
608 @Override
609 public void onPingRead(ChannelHandlerContext ctx, long data) {
610 onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false));
611 }
612
613 @Override
614 public void onPingAckRead(ChannelHandlerContext ctx, long data) {
615 onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true));
616 }
617
618 @Override
619 public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
620 Http2FrameStream stream = requireStream(streamId);
621 onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(stream));
622 }
623
624 @Override
625 public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
626 if (streamId == 0) {
627
628 return;
629 }
630 Http2FrameStream stream = requireStream(streamId);
631 onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(stream));
632 }
633
634 @Override
635 public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
636 Http2Headers headers, int streamDependency, short weight, boolean
637 exclusive, int padding, boolean endStream) {
638 onHeadersRead(ctx, streamId, headers, padding, endStream);
639 }
640
641 @Override
642 public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
643 int padding, boolean endOfStream) {
644 Http2FrameStream stream = requireStream(streamId);
645 onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding).stream(stream));
646 }
647
648 @Override
649 public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
650 boolean endOfStream) {
651 Http2FrameStream stream = requireStream(streamId);
652 final Http2DataFrame dataframe;
653 try {
654 dataframe = new DefaultHttp2DataFrame(data.retain(), endOfStream, padding);
655 } catch (IllegalArgumentException e) {
656
657 data.release();
658 throw e;
659 }
660 dataframe.stream(stream);
661 onHttp2Frame(ctx, dataframe);
662
663 return 0;
664 }
665
666 @Override
667 public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) {
668 onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData.retain()));
669 }
670
671 @Override
672 public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
673 short weight, boolean exclusive) {
674
675 Http2Stream stream = connection().stream(streamId);
676 if (stream == null) {
677
678 return;
679 }
680 Http2FrameStream frameStream = requireStream(streamId);
681 onHttp2Frame(ctx, new DefaultHttp2PriorityFrame(streamDependency, weight, exclusive)
682 .stream(frameStream));
683 }
684
685 @Override
686 public void onSettingsAckRead(ChannelHandlerContext ctx) {
687 onHttp2Frame(ctx, Http2SettingsAckFrame.INSTANCE);
688 }
689
690 @Override
691 public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
692 Http2Headers headers, int padding) {
693 Http2FrameStream stream = requireStream(streamId);
694 onHttp2Frame(ctx, new DefaultHttp2PushPromiseFrame(headers, padding, promisedStreamId)
695 .pushStream(new DefaultHttp2FrameStream()
696 .setStreamAndProperty(streamKey, connection().stream(promisedStreamId)))
697 .stream(stream));
698 }
699
700 private Http2FrameStream requireStream(int streamId) {
701 Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey);
702 if (stream == null) {
703 throw new IllegalStateException("Stream object required for identifier: " + streamId);
704 }
705 return stream;
706 }
707 }
708
709 private void onUpgradeEvent(ChannelHandlerContext ctx, UpgradeEvent evt) {
710 ctx.fireUserEventTriggered(evt);
711 }
712
713 private void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream,
714 @SuppressWarnings("unused") boolean writable) {
715 ctx.fireUserEventTriggered(stream.writabilityChanged);
716 }
717
718 void onHttp2StreamStateChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
719 ctx.fireUserEventTriggered(stream.stateChanged);
720 }
721
722 void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
723 ctx.fireChannelRead(frame);
724 }
725
726
727
728
729 protected Http2StreamFrame newHttp2UnknownFrame(byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {
730 return new DefaultHttp2UnknownFrame(frameType, flags, payload);
731 }
732
733 void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
734 ctx.fireExceptionCaught(cause);
735 }
736
737 private final class Http2RemoteFlowControllerListener implements Http2RemoteFlowController.Listener {
738 @Override
739 public void writabilityChanged(Http2Stream stream) {
740 DefaultHttp2FrameStream frameStream = stream.getProperty(streamKey);
741 if (frameStream == null) {
742 return;
743 }
744 onHttp2StreamWritabilityChanged(
745 ctx, frameStream, connection().remote().flowController().isWritable(stream));
746 }
747 }
748
749
750
751
752
753 static class DefaultHttp2FrameStream implements Http2FrameStream {
754
755 private volatile int id = -1;
756 private volatile Http2Stream stream;
757
758 final Http2FrameStreamEvent stateChanged = Http2FrameStreamEvent.stateChanged(this);
759 final Http2FrameStreamEvent writabilityChanged = Http2FrameStreamEvent.writabilityChanged(this);
760
761 Channel attachment;
762
763 DefaultHttp2FrameStream setStreamAndProperty(PropertyKey streamKey, Http2Stream stream) {
764 assert id == -1 || stream.id() == id;
765 this.stream = stream;
766 this.id = stream.id();
767 stream.setProperty(streamKey, this);
768 return this;
769 }
770
771 @Override
772 public int id() {
773 Http2Stream stream = this.stream;
774 return stream == null ? id : stream.id();
775 }
776
777 @Override
778 public State state() {
779 Http2Stream stream = this.stream;
780 return stream == null ? State.IDLE : stream.state();
781 }
782
783 @Override
784 public String toString() {
785 return String.valueOf(id());
786 }
787 }
788 }