View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInboundHandler;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.handler.codec.UnsupportedMessageTypeException;
25  import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
26  import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
27  import io.netty.handler.codec.http2.Http2Stream.State;
28  import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
29  import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
30  import io.netty.util.ReferenceCountUtil;
31  import io.netty.util.ReferenceCounted;
32  import io.netty.util.internal.UnstableApi;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
37  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
38  
39  /**
40   * <p><em>This API is very immature.</em> The Http2Connection-based API is currently preferred over this API.
41   * This API is targeted to eventually replace or reduce the need for the {@link Http2ConnectionHandler} API.
42   *
43   * <p>A HTTP/2 handler that maps HTTP/2 frames to {@link Http2Frame} objects and vice versa. For every incoming HTTP/2
44   * frame, a {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outbound {@link Http2Frame}
45   * objects received via {@link #write} are converted to the HTTP/2 wire format. HTTP/2 frames specific to a stream
46   * implement the {@link Http2StreamFrame} interface. The {@link Http2FrameCodec} is instantiated using the
47   * {@link Http2FrameCodecBuilder}. It's recommended for channel handlers to inherit from the
48   * {@link Http2ChannelDuplexHandler}, as it provides additional functionality like iterating over all active streams or
49   * creating outbound streams.
50   *
51   * <h3>Stream Lifecycle</h3>
52   *
53   * The frame codec delivers and writes frames for active streams. An active stream is closed when either side sends a
54   * {@code RST_STREAM} frame or both sides send a frame with the {@code END_STREAM} flag set. Each
55   * {@link Http2StreamFrame} has a {@link Http2FrameStream} object attached that uniquely identifies a particular stream.
56   *
57   * <p>{@link Http2StreamFrame}s read from the channel always a {@link Http2FrameStream} object set, while when writing a
58   * {@link Http2StreamFrame} the application code needs to set a {@link Http2FrameStream} object using
59   * {@link Http2StreamFrame#stream(Http2FrameStream)}.
60   *
61   * <h3>Flow control</h3>
62   *
63   * The frame codec automatically increments stream and connection flow control windows.
64   *
65   * <p>Incoming flow controlled frames need to be consumed by writing a {@link Http2WindowUpdateFrame} with the consumed
66   * number of bytes and the corresponding stream identifier set to the frame codec.
67   *
68   * <p>The local stream-level flow control window can be changed by writing a {@link Http2SettingsFrame} with the
69   * {@link Http2Settings#initialWindowSize()} set to the targeted value.
70   *
71   * <p>The connection-level flow control window can be changed by writing a {@link Http2WindowUpdateFrame} with the
72   * desired window size <em>increment</em> in bytes and the stream identifier set to {@code 0}. By default the initial
73   * connection-level flow control window is the same as initial stream-level flow control window.
74   *
75   * <h3>New inbound Streams</h3>
76   *
77   * The first frame of a HTTP/2 stream must be a {@link Http2HeadersFrame}, which will have a {@link Http2FrameStream}
78   * object attached.
79   *
80   * <h3>New outbound Streams</h3>
81   *
82   * A outbound HTTP/2 stream can be created by first instantiating a new {@link Http2FrameStream} object via
83   * {@link Http2ChannelDuplexHandler#newStream()}, and then writing a {@link Http2HeadersFrame} object with the stream
84   * attached.
85   *
86   * <pre>
87   *     final Http2Stream2 stream = handler.newStream();
88   *     ctx.write(headersFrame.stream(stream)).addListener(new ChannelFutureListener() {
89   *
90   *         @Override
91   *         public void operationComplete(ChannelFuture f) {
92   *             if (f.isSuccess()) {
93   *                 // Stream is active and stream.id() returns a valid stream identifier.
94   *                 System.out.println("New stream with id " + stream.id() + " created.");
95   *             } else {
96   *                 // Stream failed to become active. Handle error.
97   *                 if (f.cause() instanceof Http2NoMoreStreamIdsException) {
98   *
99   *                 } else if (f.cause() instanceof Http2GoAwayException) {
100  *
101  *                 } else {
102  *
103  *                 }
104  *             }
105  *         }
106  *     }
107  * </pre>
108  *
109  * <p>If a new stream cannot be created due to stream id exhaustion of the endpoint, the {@link ChannelPromise} of the
110  * HEADERS frame will fail with a {@link Http2NoMoreStreamIdsException}.
111  *
112  * <p>The HTTP/2 standard allows for an endpoint to limit the maximum number of concurrently active streams via the
113  * {@code SETTINGS_MAX_CONCURRENT_STREAMS} setting. When this limit is reached, no new streams can be created. However,
114  * the {@link Http2FrameCodec} can be build with
115  * {@link Http2FrameCodecBuilder#encoderEnforceMaxConcurrentStreams(boolean)} enabled, in which case a new stream and
116  * its associated frames will be buffered until either the limit is increased or an active stream is closed. It's,
117  * however, possible that a buffered stream will never become active. That is, the channel might
118  * get closed or a GO_AWAY frame might be received. In the first case, all writes of buffered streams will fail with a
119  * {@link Http2ChannelClosedException}. In the second case, all writes of buffered streams with an identifier less than
120  * the last stream identifier of the GO_AWAY frame will fail with a {@link Http2GoAwayException}.
121  *
122  * <h3>Error Handling</h3>
123  *
124  * Exceptions and errors are propagated via {@link ChannelInboundHandler#exceptionCaught}. Exceptions that apply to
125  * a specific HTTP/2 stream are wrapped in a {@link Http2FrameStreamException} and have the corresponding
126  * {@link Http2FrameStream} object attached.
127  *
128  * <h3>Reference Counting</h3>
129  *
130  * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
131  * reference counted objects (e.g. {@link ByteBuf}s). The frame codec will call {@link ReferenceCounted#retain()} before
132  * propagating a reference counted object through the pipeline, and thus an application handler needs to release such
133  * an object after having consumed it. For more information on reference counting take a look at
134  * http://netty.io/wiki/reference-counted-objects.html
135  *
136  * <h3>HTTP Upgrade</h3>
137  *
138  * Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link Http2ServerUpgradeCodec}; the necessary
139  * HTTP-to-HTTP/2 conversion is performed automatically.
140  */
141 @UnstableApi
142 public class Http2FrameCodec extends Http2ConnectionHandler {
143 
144     private static final InternalLogger LOG = InternalLoggerFactory.getInstance(Http2FrameCodec.class);
145 
146     private final PropertyKey streamKey;
147     private final PropertyKey upgradeKey;
148 
149     private final Integer initialFlowControlWindowSize;
150 
151     private ChannelHandlerContext ctx;
152 
153     /** Number of buffered streams if the {@link StreamBufferingEncoder} is used. **/
154     private int numBufferedStreams;
155     private DefaultHttp2FrameStream frameStreamToInitialize;
156 
157     Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings) {
158         super(decoder, encoder, initialSettings);
159 
160         decoder.frameListener(new FrameListener());
161         connection().addListener(new ConnectionListener());
162         connection().remote().flowController().listener(new Http2RemoteFlowControllerListener());
163         streamKey = connection().newKey();
164         upgradeKey = connection().newKey();
165         initialFlowControlWindowSize = initialSettings.initialWindowSize();
166     }
167 
168     /**
169      * Creates a new outbound/local stream.
170      */
171     DefaultHttp2FrameStream newStream() {
172         return new DefaultHttp2FrameStream();
173     }
174 
175     /**
176      * Iterates over all active HTTP/2 streams.
177      *
178      * <p>This method must not be called outside of the event loop.
179      */
180     final void forEachActiveStream(final Http2FrameStreamVisitor streamVisitor) throws Http2Exception {
181         assert ctx.executor().inEventLoop();
182 
183         connection().forEachActiveStream(new Http2StreamVisitor() {
184             @Override
185             public boolean visit(Http2Stream stream) {
186                 try {
187                     return streamVisitor.visit((Http2FrameStream) stream.getProperty(streamKey));
188                 } catch (Throwable cause) {
189                     onError(ctx, false, cause);
190                     return false;
191                 }
192             }
193         });
194     }
195 
196     @Override
197     public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
198         this.ctx = ctx;
199         super.handlerAdded(ctx);
200         handlerAdded0(ctx);
201         // Must be after Http2ConnectionHandler does its initialization in handlerAdded above.
202         // The server will not send a connection preface so we are good to send a window update.
203         Http2Connection connection = connection();
204         if (connection.isServer()) {
205             tryExpandConnectionFlowControlWindow(connection);
206         }
207     }
208 
209     private void tryExpandConnectionFlowControlWindow(Http2Connection connection) throws Http2Exception {
210         if (initialFlowControlWindowSize != null) {
211             // The window size in the settings explicitly excludes the connection window. So we manually manipulate the
212             // connection window to accommodate more concurrent data per connection.
213             Http2Stream connectionStream = connection.connectionStream();
214             Http2LocalFlowController localFlowController = connection.local().flowController();
215             final int delta = initialFlowControlWindowSize - localFlowController.initialWindowSize(connectionStream);
216             // Only increase the connection window, don't decrease it.
217             if (delta > 0) {
218                 // Double the delta just so a single stream can't exhaust the connection window.
219                 localFlowController.incrementWindowSize(connectionStream, Math.max(delta << 1, delta));
220                 flush(ctx);
221             }
222         }
223     }
224 
225     void handlerAdded0(@SuppressWarnings("unsed") ChannelHandlerContext ctx) throws Exception {
226         // sub-class can override this for extra steps that needs to be done when the handler is added.
227     }
228 
229     /**
230      * Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via
231      * HTTP/2 on stream 1 (the stream specifically reserved for cleartext HTTP upgrade).
232      */
233     @Override
234     public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
235         if (evt == Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE) {
236             // The user event implies that we are on the client.
237             tryExpandConnectionFlowControlWindow(connection());
238         } else if (evt instanceof UpgradeEvent) {
239             UpgradeEvent upgrade = (UpgradeEvent) evt;
240             try {
241                 onUpgradeEvent(ctx, upgrade.retain());
242                 Http2Stream stream = connection().stream(HTTP_UPGRADE_STREAM_ID);
243                 if (stream.getProperty(streamKey) == null) {
244                     // TODO: improve handler/stream lifecycle so that stream isn't active before handler added.
245                     // The stream was already made active, but ctx may have been null so it wasn't initialized.
246                     // https://github.com/netty/netty/issues/4942
247                     onStreamActive0(stream);
248                 }
249                 upgrade.upgradeRequest().headers().setInt(
250                         HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), HTTP_UPGRADE_STREAM_ID);
251                 stream.setProperty(upgradeKey, true);
252                 InboundHttpToHttp2Adapter.handle(
253                         ctx, connection(), decoder().frameListener(), upgrade.upgradeRequest().retain());
254             } finally {
255                 upgrade.release();
256             }
257             return;
258         }
259         super.userEventTriggered(ctx, evt);
260     }
261 
262     /**
263      * Processes all {@link Http2Frame}s. {@link Http2StreamFrame}s may only originate in child
264      * streams.
265      */
266     @Override
267     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
268         if (msg instanceof Http2DataFrame) {
269             Http2DataFrame dataFrame = (Http2DataFrame) msg;
270             encoder().writeData(ctx, dataFrame.stream().id(), dataFrame.content(),
271                     dataFrame.padding(), dataFrame.isEndStream(), promise);
272         } else if (msg instanceof Http2HeadersFrame) {
273             writeHeadersFrame(ctx, (Http2HeadersFrame) msg, promise);
274         } else if (msg instanceof Http2WindowUpdateFrame) {
275             Http2WindowUpdateFrame frame = (Http2WindowUpdateFrame) msg;
276             Http2FrameStream frameStream = frame.stream();
277             // It is legit to send a WINDOW_UPDATE frame for the connection stream. The parent channel doesn't attempt
278             // to set the Http2FrameStream so we assume if it is null the WINDOW_UPDATE is for the connection stream.
279             try {
280                 if (frameStream == null) {
281                     increaseInitialConnectionWindow(frame.windowSizeIncrement());
282                 } else {
283                     consumeBytes(frameStream.id(), frame.windowSizeIncrement());
284                 }
285                 promise.setSuccess();
286             } catch (Throwable t) {
287                 promise.setFailure(t);
288             }
289         } else if (msg instanceof Http2ResetFrame) {
290             Http2ResetFrame rstFrame = (Http2ResetFrame) msg;
291             encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode(), promise);
292         } else if (msg instanceof Http2PingFrame) {
293             Http2PingFrame frame = (Http2PingFrame) msg;
294             encoder().writePing(ctx, frame.ack(), frame.content(), promise);
295         } else if (msg instanceof Http2SettingsFrame) {
296             encoder().writeSettings(ctx, ((Http2SettingsFrame) msg).settings(), promise);
297         } else if (msg instanceof Http2GoAwayFrame) {
298             writeGoAwayFrame(ctx, (Http2GoAwayFrame) msg, promise);
299         } else if (msg instanceof Http2UnknownFrame) {
300             Http2UnknownFrame unknownFrame = (Http2UnknownFrame) msg;
301             encoder().writeFrame(ctx, unknownFrame.frameType(), unknownFrame.stream().id(),
302                     unknownFrame.flags(), unknownFrame.content(), promise);
303         } else if (!(msg instanceof Http2Frame)) {
304             ctx.write(msg, promise);
305         } else {
306             ReferenceCountUtil.release(msg);
307             throw new UnsupportedMessageTypeException(msg);
308         }
309     }
310 
311     private void increaseInitialConnectionWindow(int deltaBytes) throws Http2Exception {
312         // The LocalFlowController is responsible for detecting over/under flow.
313         connection().local().flowController().incrementWindowSize(connection().connectionStream(), deltaBytes);
314     }
315 
316     final boolean consumeBytes(int streamId, int bytes) throws Http2Exception {
317         Http2Stream stream = connection().stream(streamId);
318         // upgraded requests are ineligible for stream control
319         if (streamId == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
320             Boolean upgraded = stream.getProperty(upgradeKey);
321             if (Boolean.TRUE.equals(upgraded)) {
322                 return false;
323             }
324         }
325 
326         return connection().local().flowController().consumeBytes(stream, bytes);
327     }
328 
329     private void writeGoAwayFrame(ChannelHandlerContext ctx, Http2GoAwayFrame frame, ChannelPromise promise) {
330         if (frame.lastStreamId() > -1) {
331             frame.release();
332             throw new IllegalArgumentException("Last stream id must not be set on GOAWAY frame");
333         }
334 
335         int lastStreamCreated = connection().remote().lastStreamCreated();
336         long lastStreamId = lastStreamCreated + ((long) frame.extraStreamIds()) * 2;
337         // Check if the computation overflowed.
338         if (lastStreamId > Integer.MAX_VALUE) {
339             lastStreamId = Integer.MAX_VALUE;
340         }
341         goAway(ctx, (int) lastStreamId, frame.errorCode(), frame.content(), promise);
342     }
343 
344     private void writeHeadersFrame(
345             final ChannelHandlerContext ctx, Http2HeadersFrame headersFrame, final ChannelPromise promise) {
346 
347         if (isStreamIdValid(headersFrame.stream().id())) {
348             encoder().writeHeaders(ctx, headersFrame.stream().id(), headersFrame.headers(), headersFrame.padding(),
349                     headersFrame.isEndStream(), promise);
350         } else {
351             final DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) headersFrame.stream();
352             final Http2Connection connection = connection();
353             final int streamId = connection.local().incrementAndGetNextStreamId();
354             if (streamId < 0) {
355                 promise.setFailure(new Http2NoMoreStreamIdsException());
356                 return;
357             }
358             stream.id = streamId;
359 
360             // TODO: This depends on the fact that the connection based API will create Http2Stream objects
361             // synchronously. We should investigate how to refactor this later on when we consolidate some layers.
362             assert frameStreamToInitialize == null;
363             frameStreamToInitialize = stream;
364 
365             // TODO(buchgr): Once Http2Stream2 and Http2Stream are merged this is no longer necessary.
366             final ChannelPromise writePromise = ctx.newPromise();
367 
368             encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
369                     headersFrame.isEndStream(), writePromise);
370             if (writePromise.isDone()) {
371                 notifyHeaderWritePromise(writePromise, promise);
372             } else {
373                 numBufferedStreams++;
374 
375                 writePromise.addListener(new ChannelFutureListener() {
376                     @Override
377                     public void operationComplete(ChannelFuture future) throws Exception {
378                         numBufferedStreams--;
379 
380                         notifyHeaderWritePromise(future, promise);
381                     }
382                 });
383             }
384         }
385     }
386 
387     private static void notifyHeaderWritePromise(ChannelFuture future, ChannelPromise promise) {
388         Throwable cause = future.cause();
389         if (cause == null) {
390             promise.setSuccess();
391         } else {
392             promise.setFailure(cause);
393         }
394     }
395 
396     private void onStreamActive0(Http2Stream stream) {
397         if (connection().local().isValidStreamId(stream.id())) {
398             return;
399         }
400 
401         DefaultHttp2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
402         onHttp2StreamStateChanged(ctx, stream2);
403     }
404 
405     private final class ConnectionListener extends Http2ConnectionAdapter {
406 
407         @Override
408         public void onStreamAdded(Http2Stream stream) {
409              if (frameStreamToInitialize != null && stream.id() == frameStreamToInitialize.id()) {
410                  frameStreamToInitialize.setStreamAndProperty(streamKey, stream);
411                  frameStreamToInitialize = null;
412              }
413          }
414 
415         @Override
416         public void onStreamActive(Http2Stream stream) {
417             onStreamActive0(stream);
418         }
419 
420         @Override
421         public void onStreamClosed(Http2Stream stream) {
422             DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
423             if (stream2 != null) {
424                 onHttp2StreamStateChanged(ctx, stream2);
425             }
426         }
427 
428         @Override
429         public void onStreamHalfClosed(Http2Stream stream) {
430             DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
431             if (stream2 != null) {
432                 onHttp2StreamStateChanged(ctx, stream2);
433             }
434         }
435     }
436 
437     @Override
438     protected void onConnectionError(
439             ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex) {
440         if (!outbound) {
441             // allow the user to handle it first in the pipeline, and then automatically clean up.
442             // If this is not desired behavior the user can override this method.
443             //
444             // We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
445             ctx.fireExceptionCaught(cause);
446         }
447         super.onConnectionError(ctx, outbound, cause, http2Ex);
448     }
449 
450     /**
451      * Exceptions for unknown streams, that is streams that have no {@link Http2FrameStream} object attached
452      * are simply logged and replied to by sending a RST_STREAM frame.
453      */
454     @Override
455     protected final void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
456                                  Http2Exception.StreamException streamException) {
457         int streamId = streamException.streamId();
458         Http2Stream connectionStream = connection().stream(streamId);
459         if (connectionStream == null) {
460             onHttp2UnknownStreamError(ctx, cause, streamException);
461             // Write a RST_STREAM
462             super.onStreamError(ctx, outbound, cause, streamException);
463             return;
464         }
465 
466         Http2FrameStream stream = connectionStream.getProperty(streamKey);
467         if (stream == null) {
468             LOG.warn("Stream exception thrown without stream object attached.", cause);
469             // Write a RST_STREAM
470             super.onStreamError(ctx, outbound, cause, streamException);
471             return;
472         }
473 
474         if (!outbound) {
475             // We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
476             onHttp2FrameStreamException(ctx, new Http2FrameStreamException(stream, streamException.error(), cause));
477         }
478     }
479 
480     void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx, Throwable cause,
481                                    Http2Exception.StreamException streamException) {
482         // Just log....
483         LOG.warn("Stream exception thrown for unkown stream {}.", streamException.streamId(), cause);
484     }
485 
486     @Override
487     protected final boolean isGracefulShutdownComplete() {
488         return super.isGracefulShutdownComplete() && numBufferedStreams == 0;
489     }
490 
491     private final class FrameListener implements Http2FrameListener {
492 
493         @Override
494         public void onUnknownFrame(
495                 ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {
496             onHttp2Frame(ctx, new DefaultHttp2UnknownFrame(frameType, flags, payload)
497                     .stream(requireStream(streamId)).retain());
498         }
499 
500         @Override
501         public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
502             onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings));
503         }
504 
505         @Override
506         public void onPingRead(ChannelHandlerContext ctx, long data) {
507             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false));
508         }
509 
510         @Override
511         public void onPingAckRead(ChannelHandlerContext ctx, long data) {
512             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true));
513         }
514 
515         @Override
516         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
517             onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(requireStream(streamId)));
518         }
519 
520         @Override
521         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
522             if (streamId == 0) {
523                 // Ignore connection window updates.
524                 return;
525             }
526             onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(requireStream(streamId)));
527         }
528 
529         @Override
530         public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
531                                   Http2Headers headers, int streamDependency, short weight, boolean
532                                           exclusive, int padding, boolean endStream) {
533             onHeadersRead(ctx, streamId, headers, padding, endStream);
534         }
535 
536         @Override
537         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
538                                   int padding, boolean endOfStream) {
539             onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding)
540                                         .stream(requireStream(streamId)));
541         }
542 
543         @Override
544         public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
545                               boolean endOfStream) {
546             onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding)
547                                         .stream(requireStream(streamId)).retain());
548             // We return the bytes in consumeBytes() once the stream channel consumed the bytes.
549             return 0;
550         }
551 
552         @Override
553         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) {
554             onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData).retain());
555         }
556 
557         @Override
558         public void onPriorityRead(
559                 ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) {
560             // TODO: Maybe handle me
561         }
562 
563         @Override
564         public void onSettingsAckRead(ChannelHandlerContext ctx) {
565             // TODO: Maybe handle me
566         }
567 
568         @Override
569         public void onPushPromiseRead(
570                 ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding)  {
571             // TODO: Maybe handle me
572         }
573 
574         private Http2FrameStream requireStream(int streamId) {
575             Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey);
576             if (stream == null) {
577                 throw new IllegalStateException("Stream object required for identifier: " + streamId);
578             }
579             return stream;
580         }
581     }
582 
583     void onUpgradeEvent(ChannelHandlerContext ctx, UpgradeEvent evt) {
584         ctx.fireUserEventTriggered(evt);
585     }
586 
587     void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, Http2FrameStream stream,
588                                          @SuppressWarnings("unused") boolean writable) {
589         ctx.fireUserEventTriggered(Http2FrameStreamEvent.writabilityChanged(stream));
590     }
591 
592     void onHttp2StreamStateChanged(ChannelHandlerContext ctx, Http2FrameStream stream) {
593         ctx.fireUserEventTriggered(Http2FrameStreamEvent.stateChanged(stream));
594     }
595 
596     void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
597         ctx.fireChannelRead(frame);
598     }
599 
600     void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
601         ctx.fireExceptionCaught(cause);
602     }
603 
604     final boolean isWritable(DefaultHttp2FrameStream stream) {
605         Http2Stream s = stream.stream;
606         return s != null && connection().remote().flowController().isWritable(s);
607     }
608 
609     private final class Http2RemoteFlowControllerListener implements Http2RemoteFlowController.Listener {
610         @Override
611         public void writabilityChanged(Http2Stream stream) {
612             Http2FrameStream frameStream = stream.getProperty(streamKey);
613             if (frameStream == null) {
614                 return;
615             }
616             onHttp2StreamWritabilityChanged(
617                     ctx, frameStream, connection().remote().flowController().isWritable(stream));
618         }
619     }
620 
621     /**
622      * {@link Http2FrameStream} implementation.
623      */
624     // TODO(buchgr): Merge Http2FrameStream and Http2Stream.
625     static class DefaultHttp2FrameStream implements Http2FrameStream {
626 
627         private volatile int id = -1;
628         volatile Http2Stream stream;
629 
630         DefaultHttp2FrameStream setStreamAndProperty(PropertyKey streamKey, Http2Stream stream) {
631             assert id == -1 || stream.id() == id;
632             this.stream = stream;
633             stream.setProperty(streamKey, this);
634             return this;
635         }
636 
637         @Override
638         public int id() {
639             Http2Stream stream = this.stream;
640             return stream == null ? id : stream.id();
641         }
642 
643         @Override
644         public State state() {
645             Http2Stream stream = this.stream;
646             return stream == null ? State.IDLE : stream.state();
647         }
648 
649         @Override
650         public String toString() {
651             return String.valueOf(id());
652         }
653     }
654 }