View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInboundHandler;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
25  import io.netty.handler.codec.http2.Http2Stream.State;
26  import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
27  import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
28  import io.netty.handler.codec.UnsupportedMessageTypeException;
29  import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
30  import io.netty.util.ReferenceCountUtil;
31  
32  import io.netty.util.ReferenceCounted;
33  import io.netty.util.internal.UnstableApi;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
38  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
39  
40  /**
41   * <p><em>This API is very immature.</em> The Http2Connection-based API is currently preferred over this API.
42   * This API is targeted to eventually replace or reduce the need for the {@link Http2ConnectionHandler} API.
43   *
44   * <p>A HTTP/2 handler that maps HTTP/2 frames to {@link Http2Frame} objects and vice versa. For every incoming HTTP/2
45   * frame, a {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outbound {@link Http2Frame}
46   * objects received via {@link #write} are converted to the HTTP/2 wire format. HTTP/2 frames specific to a stream
47   * implement the {@link Http2StreamFrame} interface. The {@link Http2FrameCodec} is instantiated using the
48   * {@link Http2FrameCodecBuilder}. It's recommended for channel handlers to inherit from the
49   * {@link Http2ChannelDuplexHandler}, as it provides additional functionality like iterating over all active streams or
50   * creating outbound streams.
51   *
52   * <h3>Stream Lifecycle</h3>
53   *
54   * The frame codec delivers and writes frames for active streams. An active stream is closed when either side sends a
55   * {@code RST_STREAM} frame or both sides send a frame with the {@code END_STREAM} flag set. Each
56   * {@link Http2StreamFrame} has a {@link Http2FrameStream} object attached that uniquely identifies a particular stream.
57   *
58   * <p>{@link Http2StreamFrame}s read from the channel always a {@link Http2FrameStream} object set, while when writing a
59   * {@link Http2StreamFrame} the application code needs to set a {@link Http2FrameStream} object using
60   * {@link Http2StreamFrame#stream(Http2FrameStream)}.
61   *
62   * <h3>Flow control</h3>
63   *
64   * The frame codec automatically increments stream and connection flow control windows.
65   *
66   * <p>Incoming flow controlled frames need to be consumed by writing a {@link Http2WindowUpdateFrame} with the consumed
67   * number of bytes and the corresponding stream identifier set to the frame codec.
68   *
69   * <p>The local stream-level flow control window can be changed by writing a {@link Http2SettingsFrame} with the
70   * {@link Http2Settings#initialWindowSize()} set to the targeted value.
71   *
72   * <p>The connection-level flow control window can be changed by writing a {@link Http2WindowUpdateFrame} with the
73   * desired window size <em>increment</em> in bytes and the stream identifier set to {@code 0}. By default the initial
74   * connection-level flow control window is the same as initial stream-level flow control window.
75   *
76   * <h3>New inbound Streams</h3>
77   *
78   * The first frame of a HTTP/2 stream must be a {@link Http2HeadersFrame}, which will have a {@link Http2FrameStream}
79   * object attached.
80   *
81   * <h3>New outbound Streams</h3>
82   *
83   * A outbound HTTP/2 stream can be created by first instantiating a new {@link Http2FrameStream} object via
84   * {@link Http2ChannelDuplexHandler#newStream()}, and then writing a {@link Http2HeadersFrame} object with the stream
85   * attached.
86   *
87   * <pre>
88   *     final Http2Stream2 stream = handler.newStream();
89   *     ctx.write(headersFrame.stream(stream)).addListener(new ChannelFutureListener() {
90   *
91   *         @Override
92   *         public void operationComplete(ChannelFuture f) {
93   *             if (f.isSuccess()) {
94   *                 // Stream is active and stream.id() returns a valid stream identifier.
95   *                 System.out.println("New stream with id " + stream.id() + " created.");
96   *             } else {
97   *                 // Stream failed to become active. Handle error.
98   *                 if (f.cause() instanceof Http2NoMoreStreamIdsException) {
99   *
100  *                 } else if (f.cause() instanceof Http2GoAwayException) {
101  *
102  *                 } else {
103  *
104  *                 }
105  *             }
106  *         }
107  *     }
108  * </pre>
109  *
110  * <p>If a new stream cannot be created due to stream id exhaustion of the endpoint, the {@link ChannelPromise} of the
111  * HEADERS frame will fail with a {@link Http2NoMoreStreamIdsException}.
112  *
113  * <p>The HTTP/2 standard allows for an endpoint to limit the maximum number of concurrently active streams via the
114  * {@code SETTINGS_MAX_CONCURRENT_STREAMS} setting. When this limit is reached, no new streams can be created. However,
115  * the {@link Http2FrameCodec} can be build with
116  * {@link Http2FrameCodecBuilder#encoderEnforceMaxConcurrentStreams(boolean)} enabled, in which case a new stream and
117  * its associated frames will be buffered until either the limit is increased or an active stream is closed. It's,
118  * however, possible that a buffered stream will never become active. That is, the channel might
119  * get closed or a GO_AWAY frame might be received. In the first case, all writes of buffered streams will fail with a
120  * {@link Http2ChannelClosedException}. In the second case, all writes of buffered streams with an identifier less than
121  * the last stream identifier of the GO_AWAY frame will fail with a {@link Http2GoAwayException}.
122  *
123  * <h3>Error Handling</h3>
124  *
125  * Exceptions and errors are propagated via {@link ChannelInboundHandler#exceptionCaught}. Exceptions that apply to
126  * a specific HTTP/2 stream are wrapped in a {@link Http2FrameStreamException} and have the corresponding
127  * {@link Http2FrameStream} object attached.
128  *
129  * <h3>Reference Counting</h3>
130  *
131  * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
132  * reference counted objects (e.g. {@link ByteBuf}s). The frame codec will call {@link ReferenceCounted#retain()} before
133  * propagating a reference counted object through the pipeline, and thus an application handler needs to release such
134  * an object after having consumed it. For more information on reference counting take a look at
135  * http://netty.io/wiki/reference-counted-objects.html
136  *
137  * <h3>HTTP Upgrade</h3>
138  *
139  * Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link Http2ServerUpgradeCodec}; the necessary
140  * HTTP-to-HTTP/2 conversion is performed automatically.
141  */
142 @UnstableApi
143 public class Http2FrameCodec extends Http2ConnectionHandler {
144 
145     private static final InternalLogger LOG = InternalLoggerFactory.getInstance(Http2FrameCodec.class);
146 
147     private final PropertyKey streamKey;
148     private final PropertyKey upgradeKey;
149 
150     private final Integer initialFlowControlWindowSize;
151 
152     private ChannelHandlerContext ctx;
153 
154     /** Number of buffered streams if the {@link StreamBufferingEncoder} is used. **/
155     private int numBufferedStreams;
156     private DefaultHttp2FrameStream frameStreamToInitialize;
157 
158     Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings) {
159         super(decoder, encoder, initialSettings);
160 
161         decoder.frameListener(new FrameListener());
162         connection().addListener(new ConnectionListener());
163         connection().remote().flowController().listener(new Http2RemoteFlowControllerListener());
164         streamKey = connection().newKey();
165         upgradeKey = connection().newKey();
166         initialFlowControlWindowSize = initialSettings.initialWindowSize();
167     }
168 
169     /**
170      * Creates a new outbound/local stream.
171      */
172     DefaultHttp2FrameStream newStream() {
173         return new DefaultHttp2FrameStream();
174     }
175 
176     /**
177      * Iterates over all active HTTP/2 streams.
178      *
179      * <p>This method must not be called outside of the event loop.
180      */
181     final void forEachActiveStream(final Http2FrameStreamVisitor streamVisitor) throws Http2Exception {
182         assert ctx.executor().inEventLoop();
183 
184         connection().forEachActiveStream(new Http2StreamVisitor() {
185             @Override
186             public boolean visit(Http2Stream stream) {
187                 try {
188                     return streamVisitor.visit((Http2FrameStream) stream.getProperty(streamKey));
189                 } catch (Throwable cause) {
190                     onError(ctx, cause);
191                     return false;
192                 }
193             }
194         });
195     }
196 
197     @Override
198     public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
199         this.ctx = ctx;
200         super.handlerAdded(ctx);
201         handlerAdded0(ctx);
202         // Must be after Http2ConnectionHandler does its initialization in handlerAdded above.
203         // The server will not send a connection preface so we are good to send a window update.
204         Http2Connection connection = connection();
205         if (connection.isServer()) {
206             tryExpandConnectionFlowControlWindow(connection);
207         }
208     }
209 
210     private void tryExpandConnectionFlowControlWindow(Http2Connection connection) throws Http2Exception {
211         if (initialFlowControlWindowSize != null) {
212             // The window size in the settings explicitly excludes the connection window. So we manually manipulate the
213             // connection window to accommodate more concurrent data per connection.
214             Http2Stream connectionStream = connection.connectionStream();
215             Http2LocalFlowController localFlowController = connection.local().flowController();
216             final int delta = initialFlowControlWindowSize - localFlowController.initialWindowSize(connectionStream);
217             // Only increase the connection window, don't decrease it.
218             if (delta > 0) {
219                 // Double the delta just so a single stream can't exhaust the connection window.
220                 localFlowController.incrementWindowSize(connectionStream, Math.max(delta << 1, delta));
221                 flush(ctx);
222             }
223         }
224     }
225 
226     void handlerAdded0(@SuppressWarnings("unsed") ChannelHandlerContext ctx) throws Exception {
227         // sub-class can override this for extra steps that needs to be done when the handler is added.
228     }
229 
230     /**
231      * Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via
232      * HTTP/2 on stream 1 (the stream specifically reserved for cleartext HTTP upgrade).
233      */
234     @Override
235     public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
236         if (evt == Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE) {
237             // The user event implies that we are on the client.
238             tryExpandConnectionFlowControlWindow(connection());
239         } else if (evt instanceof UpgradeEvent) {
240             UpgradeEvent upgrade = (UpgradeEvent) evt;
241             try {
242                 onUpgradeEvent(ctx, upgrade.retain());
243                 Http2Stream stream = connection().stream(HTTP_UPGRADE_STREAM_ID);
244                 if (stream.getProperty(streamKey) == null) {
245                     // TODO: improve handler/stream lifecycle so that stream isn't active before handler added.
246                     // The stream was already made active, but ctx may have been null so it wasn't initialized.
247                     // https://github.com/netty/netty/issues/4942
248                     onStreamActive0(stream);
249                 }
250                 upgrade.upgradeRequest().headers().setInt(
251                         HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), HTTP_UPGRADE_STREAM_ID);
252                 stream.setProperty(upgradeKey, true);
253                 InboundHttpToHttp2Adapter.handle(
254                         ctx, connection(), decoder().frameListener(), upgrade.upgradeRequest().retain());
255             } finally {
256                 upgrade.release();
257             }
258             return;
259         }
260         super.userEventTriggered(ctx, evt);
261     }
262 
263     /**
264      * Processes all {@link Http2Frame}s. {@link Http2StreamFrame}s may only originate in child
265      * streams.
266      */
267     @Override
268     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
269         if (msg instanceof Http2DataFrame) {
270             Http2DataFrame dataFrame = (Http2DataFrame) msg;
271             encoder().writeData(ctx, dataFrame.stream().id(), dataFrame.content(),
272                     dataFrame.padding(), dataFrame.isEndStream(), promise);
273         } else if (msg instanceof Http2HeadersFrame) {
274             writeHeadersFrame(ctx, (Http2HeadersFrame) msg, promise);
275         } else if (msg instanceof Http2WindowUpdateFrame) {
276             Http2WindowUpdateFrame frame = (Http2WindowUpdateFrame) msg;
277             writeWindowUpdate(frame.stream().id(), frame.windowSizeIncrement(), promise);
278         } else if (msg instanceof Http2ResetFrame) {
279             Http2ResetFrame rstFrame = (Http2ResetFrame) msg;
280             encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode(), promise);
281         } else if (msg instanceof Http2PingFrame) {
282             Http2PingFrame frame = (Http2PingFrame) msg;
283             encoder().writePing(ctx, frame.ack(), frame.content(), promise);
284         } else if (msg instanceof Http2SettingsFrame) {
285             encoder().writeSettings(ctx, ((Http2SettingsFrame) msg).settings(), promise);
286         } else if (msg instanceof Http2GoAwayFrame) {
287             writeGoAwayFrame(ctx, (Http2GoAwayFrame) msg, promise);
288         } else if (!(msg instanceof Http2Frame)) {
289             ctx.write(msg, promise);
290         } else {
291             ReferenceCountUtil.release(msg);
292             throw new UnsupportedMessageTypeException(msg);
293         }
294     }
295 
296     private void writeWindowUpdate(int streamId, int bytes, ChannelPromise promise) {
297         try {
298             if (streamId == 0) {
299                 increaseInitialConnectionWindow(bytes);
300             } else {
301                 consumeBytes(streamId, bytes);
302             }
303             promise.setSuccess();
304         } catch (Throwable t) {
305             promise.setFailure(t);
306         }
307     }
308 
309     private void increaseInitialConnectionWindow(int deltaBytes) throws Http2Exception {
310         Http2LocalFlowController localFlow = connection().local().flowController();
311         int targetConnectionWindow = localFlow.initialWindowSize() + deltaBytes;
312         localFlow.incrementWindowSize(connection().connectionStream(), deltaBytes);
313         localFlow.initialWindowSize(targetConnectionWindow);
314     }
315 
316     final boolean consumeBytes(int streamId, int bytes) throws Http2Exception {
317         Http2Stream stream = connection().stream(streamId);
318         // upgraded requests are ineligible for stream control
319         if (streamId == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
320             Boolean upgraded = stream.getProperty(upgradeKey);
321             if (Boolean.TRUE.equals(upgraded)) {
322                 return false;
323             }
324         }
325 
326         return connection().local().flowController().consumeBytes(stream, bytes);
327     }
328 
329     private void writeGoAwayFrame(ChannelHandlerContext ctx, Http2GoAwayFrame frame, ChannelPromise promise) {
330         if (frame.lastStreamId() > -1) {
331             frame.release();
332             throw new IllegalArgumentException("Last stream id must not be set on GOAWAY frame");
333         }
334 
335         int lastStreamCreated = connection().remote().lastStreamCreated();
336         long lastStreamId = lastStreamCreated + ((long) frame.extraStreamIds()) * 2;
337         // Check if the computation overflowed.
338         if (lastStreamId > Integer.MAX_VALUE) {
339             lastStreamId = Integer.MAX_VALUE;
340         }
341         goAway(ctx, (int) lastStreamId, frame.errorCode(), frame.content(), promise);
342     }
343 
344     private void writeHeadersFrame(
345             final ChannelHandlerContext ctx, Http2HeadersFrame headersFrame, final ChannelPromise promise) {
346 
347         if (isStreamIdValid(headersFrame.stream().id())) {
348             encoder().writeHeaders(ctx, headersFrame.stream().id(), headersFrame.headers(), headersFrame.padding(),
349                     headersFrame.isEndStream(), promise);
350         } else {
351             final DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) headersFrame.stream();
352             final Http2Connection connection = connection();
353             final int streamId = connection.local().incrementAndGetNextStreamId();
354             if (streamId < 0) {
355                 promise.setFailure(new Http2NoMoreStreamIdsException());
356                 return;
357             }
358             stream.id = streamId;
359 
360             // TODO: This depends on the fact that the connection based API will create Http2Stream objects
361             // synchronously. We should investigate how to refactor this later on when we consolidate some layers.
362             assert frameStreamToInitialize == null;
363             frameStreamToInitialize = stream;
364 
365             // TODO(buchgr): Once Http2Stream2 and Http2Stream are merged this is no longer necessary.
366             final ChannelPromise writePromise = ctx.newPromise();
367 
368             encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
369                     headersFrame.isEndStream(), writePromise);
370             if (writePromise.isDone()) {
371                 notifyHeaderWritePromise(writePromise, promise);
372             } else {
373                 numBufferedStreams++;
374 
375                 writePromise.addListener(new ChannelFutureListener() {
376                     @Override
377                     public void operationComplete(ChannelFuture future) throws Exception {
378                         numBufferedStreams--;
379 
380                         notifyHeaderWritePromise(future, promise);
381                     }
382                 });
383             }
384         }
385     }
386 
387     private static void notifyHeaderWritePromise(ChannelFuture future, ChannelPromise promise) {
388         Throwable cause = future.cause();
389         if (cause == null) {
390             promise.setSuccess();
391         } else {
392             promise.setFailure(cause);
393         }
394     }
395 
396     private void onStreamActive0(Http2Stream stream) {
397         if (connection().local().isValidStreamId(stream.id())) {
398             return;
399         }
400 
401         DefaultHttp2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
402         onHttp2StreamStateChanged(ctx, stream2);
403     }
404 
405     private final class ConnectionListener extends Http2ConnectionAdapter {
406 
407         @Override
408         public void onStreamAdded(Http2Stream stream) {
409              if (frameStreamToInitialize != null && stream.id() == frameStreamToInitialize.id()) {
410                  frameStreamToInitialize.setStreamAndProperty(streamKey, stream);
411                  frameStreamToInitialize = null;
412              }
413          }
414 
415         @Override
416         public void onStreamActive(Http2Stream stream) {
417             onStreamActive0(stream);
418         }
419 
420         @Override
421         public void onStreamClosed(Http2Stream stream) {
422             DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
423             if (stream2 != null) {
424                 onHttp2StreamStateChanged(ctx, stream2);
425             }
426         }
427 
428         @Override
429         public void onStreamHalfClosed(Http2Stream stream) {
430             DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
431             if (stream2 != null) {
432                 onHttp2StreamStateChanged(ctx, stream2);
433             }
434         }
435     }
436 
437     @Override
438     protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause, Http2Exception http2Ex) {
439         // allow the user to handle it first in the pipeline, and then automatically clean up.
440         // If this is not desired behavior the user can override this method.
441         ctx.fireExceptionCaught(cause);
442         super.onConnectionError(ctx, cause, http2Ex);
443     }
444 
445     /**
446      * Exceptions for unknown streams, that is streams that have no {@link Http2FrameStream} object attached
447      * are simply logged and replied to by sending a RST_STREAM frame.
448      */
449     @Override
450     protected final void onStreamError(ChannelHandlerContext ctx, Throwable cause,
451                                  Http2Exception.StreamException streamException) {
452         int streamId = streamException.streamId();
453         Http2Stream connectionStream = connection().stream(streamId);
454         if (connectionStream == null) {
455             onHttp2UnknownStreamError(ctx, cause, streamException);
456             // Write a RST_STREAM
457             super.onStreamError(ctx, cause, streamException);
458             return;
459         }
460 
461         Http2FrameStream stream = connectionStream.getProperty(streamKey);
462         if (stream == null) {
463             LOG.warn("Stream exception thrown without stream object attached.", cause);
464             // Write a RST_STREAM
465             super.onStreamError(ctx, cause, streamException);
466             return;
467         }
468 
469         onHttp2FrameStreamException(ctx, new Http2FrameStreamException(stream, streamException.error(), cause));
470     }
471 
472     void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx, Throwable cause,
473                                    Http2Exception.StreamException streamException) {
474         // Just log....
475         LOG.warn("Stream exception thrown for unkown stream {}.", streamException.streamId(), cause);
476     }
477 
478     @Override
479     protected final boolean isGracefulShutdownComplete() {
480         return super.isGracefulShutdownComplete() && numBufferedStreams == 0;
481     }
482 
483     private final class FrameListener implements Http2FrameListener {
484 
485         @Override
486         public void onUnknownFrame(
487                 ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {
488             onHttp2Frame(ctx, new DefaultHttp2UnknownFrame(frameType, flags, payload)
489                     .stream(requireStream(streamId)).retain());
490         }
491 
492         @Override
493         public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
494             onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings));
495         }
496 
497         @Override
498         public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) {
499             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false).retain());
500         }
501 
502         @Override
503         public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) {
504             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true).retain());
505         }
506 
507         @Override
508         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
509             onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(requireStream(streamId)));
510         }
511 
512         @Override
513         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
514             if (streamId == 0) {
515                 // Ignore connection window updates.
516                 return;
517             }
518             onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(requireStream(streamId)));
519         }
520 
521         @Override
522         public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
523                                   Http2Headers headers, int streamDependency, short weight, boolean
524                                           exclusive, int padding, boolean endStream) {
525             onHeadersRead(ctx, streamId, headers, padding, endStream);
526         }
527 
528         @Override
529         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
530                                   int padding, boolean endOfStream) {
531             onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding)
532                                         .stream(requireStream(streamId)));
533         }
534 
535         @Override
536         public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
537                               boolean endOfStream) {
538             onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding)
539                                         .stream(requireStream(streamId)).retain());
540             // We return the bytes in consumeBytes() once the stream channel consumed the bytes.
541             return 0;
542         }
543 
544         @Override
545         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) {
546             onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData).retain());
547         }
548 
549         @Override
550         public void onPriorityRead(
551                 ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) {
552             // TODO: Maybe handle me
553         }
554 
555         @Override
556         public void onSettingsAckRead(ChannelHandlerContext ctx) {
557             // TODO: Maybe handle me
558         }
559 
560         @Override
561         public void onPushPromiseRead(
562                 ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding)  {
563             // TODO: Maybe handle me
564         }
565 
566         private Http2FrameStream requireStream(int streamId) {
567             Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey);
568             if (stream == null) {
569                 throw new IllegalStateException("Stream object required for identifier: " + streamId);
570             }
571             return stream;
572         }
573     }
574 
575     void onUpgradeEvent(ChannelHandlerContext ctx, UpgradeEvent evt) {
576         ctx.fireUserEventTriggered(evt);
577     }
578 
579     void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, Http2FrameStream stream,
580                                          @SuppressWarnings("unused") boolean writable) {
581         ctx.fireUserEventTriggered(Http2FrameStreamEvent.writabilityChanged(stream));
582     }
583 
584     void onHttp2StreamStateChanged(ChannelHandlerContext ctx, Http2FrameStream stream) {
585         ctx.fireUserEventTriggered(Http2FrameStreamEvent.stateChanged(stream));
586     }
587 
588     void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
589         ctx.fireChannelRead(frame);
590     }
591 
592     void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
593         ctx.fireExceptionCaught(cause);
594     }
595 
596     final boolean isWritable(DefaultHttp2FrameStream stream) {
597         Http2Stream s = stream.stream;
598         return s != null && connection().remote().flowController().isWritable(s);
599     }
600 
601     private final class Http2RemoteFlowControllerListener implements Http2RemoteFlowController.Listener {
602         @Override
603         public void writabilityChanged(Http2Stream stream) {
604             Http2FrameStream frameStream = stream.getProperty(streamKey);
605             if (frameStream == null) {
606                 return;
607             }
608             onHttp2StreamWritabilityChanged(
609                     ctx, frameStream, connection().remote().flowController().isWritable(stream));
610         }
611     }
612 
613     /**
614      * {@link Http2FrameStream} implementation.
615      */
616     // TODO(buchgr): Merge Http2FrameStream and Http2Stream.
617     static class DefaultHttp2FrameStream implements Http2FrameStream {
618 
619         private volatile int id = -1;
620         volatile Http2Stream stream;
621 
622         DefaultHttp2FrameStream setStreamAndProperty(PropertyKey streamKey, Http2Stream stream) {
623             assert id == -1 || stream.id() == id;
624             this.stream = stream;
625             stream.setProperty(streamKey, this);
626             return this;
627         }
628 
629         @Override
630         public int id() {
631             Http2Stream stream = this.stream;
632             return stream == null ? id : stream.id();
633         }
634 
635         @Override
636         public State state() {
637             Http2Stream stream = this.stream;
638             return stream == null ? State.IDLE : stream.state();
639         }
640 
641         @Override
642         public String toString() {
643             return String.valueOf(id());
644         }
645     }
646 }