View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInboundHandler;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.handler.codec.UnsupportedMessageTypeException;
25  import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
26  import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
27  import io.netty.handler.codec.http2.Http2Stream.State;
28  import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
29  import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
30  import io.netty.util.ReferenceCountUtil;
31  import io.netty.util.ReferenceCounted;
32  import io.netty.util.internal.UnstableApi;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
37  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
38  
39  /**
40   * <p><em>This API is very immature.</em> The Http2Connection-based API is currently preferred over this API.
41   * This API is targeted to eventually replace or reduce the need for the {@link Http2ConnectionHandler} API.
42   *
43   * <p>A HTTP/2 handler that maps HTTP/2 frames to {@link Http2Frame} objects and vice versa. For every incoming HTTP/2
44   * frame, a {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outbound {@link Http2Frame}
45   * objects received via {@link #write} are converted to the HTTP/2 wire format. HTTP/2 frames specific to a stream
46   * implement the {@link Http2StreamFrame} interface. The {@link Http2FrameCodec} is instantiated using the
47   * {@link Http2FrameCodecBuilder}. It's recommended for channel handlers to inherit from the
48   * {@link Http2ChannelDuplexHandler}, as it provides additional functionality like iterating over all active streams or
49   * creating outbound streams.
50   *
51   * <h3>Stream Lifecycle</h3>
52   *
53   * The frame codec delivers and writes frames for active streams. An active stream is closed when either side sends a
54   * {@code RST_STREAM} frame or both sides send a frame with the {@code END_STREAM} flag set. Each
55   * {@link Http2StreamFrame} has a {@link Http2FrameStream} object attached that uniquely identifies a particular stream.
56   *
57   * <p>{@link Http2StreamFrame}s read from the channel always a {@link Http2FrameStream} object set, while when writing a
58   * {@link Http2StreamFrame} the application code needs to set a {@link Http2FrameStream} object using
59   * {@link Http2StreamFrame#stream(Http2FrameStream)}.
60   *
61   * <h3>Flow control</h3>
62   *
63   * The frame codec automatically increments stream and connection flow control windows.
64   *
65   * <p>Incoming flow controlled frames need to be consumed by writing a {@link Http2WindowUpdateFrame} with the consumed
66   * number of bytes and the corresponding stream identifier set to the frame codec.
67   *
68   * <p>The local stream-level flow control window can be changed by writing a {@link Http2SettingsFrame} with the
69   * {@link Http2Settings#initialWindowSize()} set to the targeted value.
70   *
71   * <p>The connection-level flow control window can be changed by writing a {@link Http2WindowUpdateFrame} with the
72   * desired window size <em>increment</em> in bytes and the stream identifier set to {@code 0}. By default the initial
73   * connection-level flow control window is the same as initial stream-level flow control window.
74   *
75   * <h3>New inbound Streams</h3>
76   *
77   * The first frame of a HTTP/2 stream must be a {@link Http2HeadersFrame}, which will have a {@link Http2FrameStream}
78   * object attached.
79   *
80   * <h3>New outbound Streams</h3>
81   *
82   * A outbound HTTP/2 stream can be created by first instantiating a new {@link Http2FrameStream} object via
83   * {@link Http2ChannelDuplexHandler#newStream()}, and then writing a {@link Http2HeadersFrame} object with the stream
84   * attached.
85   *
86   * <pre>
87   *     final Http2Stream2 stream = handler.newStream();
88   *     ctx.write(headersFrame.stream(stream)).addListener(new ChannelFutureListener() {
89   *
90   *         @Override
91   *         public void operationComplete(ChannelFuture f) {
92   *             if (f.isSuccess()) {
93   *                 // Stream is active and stream.id() returns a valid stream identifier.
94   *                 System.out.println("New stream with id " + stream.id() + " created.");
95   *             } else {
96   *                 // Stream failed to become active. Handle error.
97   *                 if (f.cause() instanceof Http2NoMoreStreamIdsException) {
98   *
99   *                 } else if (f.cause() instanceof Http2GoAwayException) {
100  *
101  *                 } else {
102  *
103  *                 }
104  *             }
105  *         }
106  *     }
107  * </pre>
108  *
109  * <p>If a new stream cannot be created due to stream id exhaustion of the endpoint, the {@link ChannelPromise} of the
110  * HEADERS frame will fail with a {@link Http2NoMoreStreamIdsException}.
111  *
112  * <p>The HTTP/2 standard allows for an endpoint to limit the maximum number of concurrently active streams via the
113  * {@code SETTINGS_MAX_CONCURRENT_STREAMS} setting. When this limit is reached, no new streams can be created. However,
114  * the {@link Http2FrameCodec} can be build with
115  * {@link Http2FrameCodecBuilder#encoderEnforceMaxConcurrentStreams(boolean)} enabled, in which case a new stream and
116  * its associated frames will be buffered until either the limit is increased or an active stream is closed. It's,
117  * however, possible that a buffered stream will never become active. That is, the channel might
118  * get closed or a GO_AWAY frame might be received. In the first case, all writes of buffered streams will fail with a
119  * {@link Http2ChannelClosedException}. In the second case, all writes of buffered streams with an identifier less than
120  * the last stream identifier of the GO_AWAY frame will fail with a {@link Http2GoAwayException}.
121  *
122  * <h3>Error Handling</h3>
123  *
124  * Exceptions and errors are propagated via {@link ChannelInboundHandler#exceptionCaught}. Exceptions that apply to
125  * a specific HTTP/2 stream are wrapped in a {@link Http2FrameStreamException} and have the corresponding
126  * {@link Http2FrameStream} object attached.
127  *
128  * <h3>Reference Counting</h3>
129  *
130  * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
131  * reference counted objects (e.g. {@link ByteBuf}s). The frame codec will call {@link ReferenceCounted#retain()} before
132  * propagating a reference counted object through the pipeline, and thus an application handler needs to release such
133  * an object after having consumed it. For more information on reference counting take a look at
134  * http://netty.io/wiki/reference-counted-objects.html
135  *
136  * <h3>HTTP Upgrade</h3>
137  *
138  * Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link Http2ServerUpgradeCodec}; the necessary
139  * HTTP-to-HTTP/2 conversion is performed automatically.
140  */
141 @UnstableApi
142 public class Http2FrameCodec extends Http2ConnectionHandler {
143 
144     private static final InternalLogger LOG = InternalLoggerFactory.getInstance(Http2FrameCodec.class);
145 
146     protected final PropertyKey streamKey;
147     private final PropertyKey upgradeKey;
148 
149     private final Integer initialFlowControlWindowSize;
150 
151     private ChannelHandlerContext ctx;
152 
153     /** Number of buffered streams if the {@link StreamBufferingEncoder} is used. **/
154     private int numBufferedStreams;
155     private DefaultHttp2FrameStream frameStreamToInitialize;
156 
157     Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings) {
158         super(decoder, encoder, initialSettings);
159 
160         decoder.frameListener(new FrameListener());
161         connection().addListener(new ConnectionListener());
162         connection().remote().flowController().listener(new Http2RemoteFlowControllerListener());
163         streamKey = connection().newKey();
164         upgradeKey = connection().newKey();
165         initialFlowControlWindowSize = initialSettings.initialWindowSize();
166     }
167 
168     /**
169      * Creates a new outbound/local stream.
170      */
171     DefaultHttp2FrameStream newStream() {
172         return new DefaultHttp2FrameStream();
173     }
174 
175     /**
176      * Iterates over all active HTTP/2 streams.
177      *
178      * <p>This method must not be called outside of the event loop.
179      */
180     final void forEachActiveStream(final Http2FrameStreamVisitor streamVisitor) throws Http2Exception {
181         assert ctx.executor().inEventLoop();
182 
183         connection().forEachActiveStream(new Http2StreamVisitor() {
184             @Override
185             public boolean visit(Http2Stream stream) {
186                 try {
187                     return streamVisitor.visit((Http2FrameStream) stream.getProperty(streamKey));
188                 } catch (Throwable cause) {
189                     onError(ctx, false, cause);
190                     return false;
191                 }
192             }
193         });
194     }
195 
196     @Override
197     public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
198         this.ctx = ctx;
199         super.handlerAdded(ctx);
200         handlerAdded0(ctx);
201         // Must be after Http2ConnectionHandler does its initialization in handlerAdded above.
202         // The server will not send a connection preface so we are good to send a window update.
203         Http2Connection connection = connection();
204         if (connection.isServer()) {
205             tryExpandConnectionFlowControlWindow(connection);
206         }
207     }
208 
209     private void tryExpandConnectionFlowControlWindow(Http2Connection connection) throws Http2Exception {
210         if (initialFlowControlWindowSize != null) {
211             // The window size in the settings explicitly excludes the connection window. So we manually manipulate the
212             // connection window to accommodate more concurrent data per connection.
213             Http2Stream connectionStream = connection.connectionStream();
214             Http2LocalFlowController localFlowController = connection.local().flowController();
215             final int delta = initialFlowControlWindowSize - localFlowController.initialWindowSize(connectionStream);
216             // Only increase the connection window, don't decrease it.
217             if (delta > 0) {
218                 // Double the delta just so a single stream can't exhaust the connection window.
219                 localFlowController.incrementWindowSize(connectionStream, Math.max(delta << 1, delta));
220                 flush(ctx);
221             }
222         }
223     }
224 
225     void handlerAdded0(@SuppressWarnings("unsed") ChannelHandlerContext ctx) throws Exception {
226         // sub-class can override this for extra steps that needs to be done when the handler is added.
227     }
228 
229     /**
230      * Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via
231      * HTTP/2 on stream 1 (the stream specifically reserved for cleartext HTTP upgrade).
232      */
233     @Override
234     public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
235         if (evt == Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE) {
236             // The user event implies that we are on the client.
237             tryExpandConnectionFlowControlWindow(connection());
238         } else if (evt instanceof UpgradeEvent) {
239             UpgradeEvent upgrade = (UpgradeEvent) evt;
240             try {
241                 onUpgradeEvent(ctx, upgrade.retain());
242                 Http2Stream stream = connection().stream(HTTP_UPGRADE_STREAM_ID);
243                 if (stream.getProperty(streamKey) == null) {
244                     // TODO: improve handler/stream lifecycle so that stream isn't active before handler added.
245                     // The stream was already made active, but ctx may have been null so it wasn't initialized.
246                     // https://github.com/netty/netty/issues/4942
247                     onStreamActive0(stream);
248                 }
249                 upgrade.upgradeRequest().headers().setInt(
250                         HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), HTTP_UPGRADE_STREAM_ID);
251                 stream.setProperty(upgradeKey, true);
252                 InboundHttpToHttp2Adapter.handle(
253                         ctx, connection(), decoder().frameListener(), upgrade.upgradeRequest().retain());
254             } finally {
255                 upgrade.release();
256             }
257             return;
258         }
259         super.userEventTriggered(ctx, evt);
260     }
261 
262     /**
263      * Processes all {@link Http2Frame}s. {@link Http2StreamFrame}s may only originate in child
264      * streams.
265      */
266     @Override
267     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
268         if (msg instanceof Http2DataFrame) {
269             Http2DataFrame dataFrame = (Http2DataFrame) msg;
270             encoder().writeData(ctx, dataFrame.stream().id(), dataFrame.content(),
271                     dataFrame.padding(), dataFrame.isEndStream(), promise);
272         } else if (msg instanceof Http2HeadersFrame) {
273             writeHeadersFrame(ctx, (Http2HeadersFrame) msg, promise);
274         } else if (msg instanceof Http2WindowUpdateFrame) {
275             Http2WindowUpdateFrame frame = (Http2WindowUpdateFrame) msg;
276             Http2FrameStream frameStream = frame.stream();
277             // It is legit to send a WINDOW_UPDATE frame for the connection stream. The parent channel doesn't attempt
278             // to set the Http2FrameStream so we assume if it is null the WINDOW_UPDATE is for the connection stream.
279             try {
280                 if (frameStream == null) {
281                     increaseInitialConnectionWindow(frame.windowSizeIncrement());
282                 } else {
283                     consumeBytes(frameStream.id(), frame.windowSizeIncrement());
284                 }
285                 promise.setSuccess();
286             } catch (Throwable t) {
287                 promise.setFailure(t);
288             }
289         } else if (msg instanceof Http2ResetFrame) {
290             Http2ResetFrame rstFrame = (Http2ResetFrame) msg;
291             encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode(), promise);
292         } else if (msg instanceof Http2PingFrame) {
293             Http2PingFrame frame = (Http2PingFrame) msg;
294             encoder().writePing(ctx, frame.ack(), frame.content(), promise);
295         } else if (msg instanceof Http2SettingsFrame) {
296             encoder().writeSettings(ctx, ((Http2SettingsFrame) msg).settings(), promise);
297         } else if (msg instanceof Http2GoAwayFrame) {
298             writeGoAwayFrame(ctx, (Http2GoAwayFrame) msg, promise);
299         } else if (msg instanceof Http2UnknownFrame) {
300             Http2UnknownFrame unknownFrame = (Http2UnknownFrame) msg;
301             encoder().writeFrame(ctx, unknownFrame.frameType(), unknownFrame.stream().id(),
302                     unknownFrame.flags(), unknownFrame.content(), promise);
303         } else if (!(msg instanceof Http2Frame)) {
304             ctx.write(msg, promise);
305         } else {
306             ReferenceCountUtil.release(msg);
307             throw new UnsupportedMessageTypeException(msg);
308         }
309     }
310 
311     private void increaseInitialConnectionWindow(int deltaBytes) throws Http2Exception {
312         // The LocalFlowController is responsible for detecting over/under flow.
313         connection().local().flowController().incrementWindowSize(connection().connectionStream(), deltaBytes);
314     }
315 
316     final boolean consumeBytes(int streamId, int bytes) throws Http2Exception {
317         Http2Stream stream = connection().stream(streamId);
318         // Upgraded requests are ineligible for stream control. We add the null check
319         // in case the stream has been deregistered.
320         if (stream != null && streamId == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
321             Boolean upgraded = stream.getProperty(upgradeKey);
322             if (Boolean.TRUE.equals(upgraded)) {
323                 return false;
324             }
325         }
326 
327         return connection().local().flowController().consumeBytes(stream, bytes);
328     }
329 
330     private void writeGoAwayFrame(ChannelHandlerContext ctx, Http2GoAwayFrame frame, ChannelPromise promise) {
331         if (frame.lastStreamId() > -1) {
332             frame.release();
333             throw new IllegalArgumentException("Last stream id must not be set on GOAWAY frame");
334         }
335 
336         int lastStreamCreated = connection().remote().lastStreamCreated();
337         long lastStreamId = lastStreamCreated + ((long) frame.extraStreamIds()) * 2;
338         // Check if the computation overflowed.
339         if (lastStreamId > Integer.MAX_VALUE) {
340             lastStreamId = Integer.MAX_VALUE;
341         }
342         goAway(ctx, (int) lastStreamId, frame.errorCode(), frame.content(), promise);
343     }
344 
345     private void writeHeadersFrame(
346             final ChannelHandlerContext ctx, Http2HeadersFrame headersFrame, final ChannelPromise promise) {
347 
348         if (isStreamIdValid(headersFrame.stream().id())) {
349             encoder().writeHeaders(ctx, headersFrame.stream().id(), headersFrame.headers(), headersFrame.padding(),
350                     headersFrame.isEndStream(), promise);
351         } else {
352             final DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) headersFrame.stream();
353             final Http2Connection connection = connection();
354             final int streamId = connection.local().incrementAndGetNextStreamId();
355             if (streamId < 0) {
356                 promise.setFailure(new Http2NoMoreStreamIdsException());
357                 return;
358             }
359             stream.id = streamId;
360 
361             // TODO: This depends on the fact that the connection based API will create Http2Stream objects
362             // synchronously. We should investigate how to refactor this later on when we consolidate some layers.
363             assert frameStreamToInitialize == null;
364             frameStreamToInitialize = stream;
365 
366             // TODO(buchgr): Once Http2Stream2 and Http2Stream are merged this is no longer necessary.
367             final ChannelPromise writePromise = ctx.newPromise();
368 
369             encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
370                     headersFrame.isEndStream(), writePromise);
371             if (writePromise.isDone()) {
372                 notifyHeaderWritePromise(writePromise, promise);
373             } else {
374                 numBufferedStreams++;
375 
376                 writePromise.addListener(new ChannelFutureListener() {
377                     @Override
378                     public void operationComplete(ChannelFuture future) throws Exception {
379                         numBufferedStreams--;
380 
381                         notifyHeaderWritePromise(future, promise);
382                     }
383                 });
384             }
385         }
386     }
387 
388     private static void notifyHeaderWritePromise(ChannelFuture future, ChannelPromise promise) {
389         Throwable cause = future.cause();
390         if (cause == null) {
391             promise.setSuccess();
392         } else {
393             promise.setFailure(cause);
394         }
395     }
396 
397     private void onStreamActive0(Http2Stream stream) {
398         if (connection().local().isValidStreamId(stream.id())) {
399             return;
400         }
401 
402         DefaultHttp2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
403         onHttp2StreamStateChanged(ctx, stream2);
404     }
405 
406     private final class ConnectionListener extends Http2ConnectionAdapter {
407 
408         @Override
409         public void onStreamAdded(Http2Stream stream) {
410              if (frameStreamToInitialize != null && stream.id() == frameStreamToInitialize.id()) {
411                  frameStreamToInitialize.setStreamAndProperty(streamKey, stream);
412                  frameStreamToInitialize = null;
413              }
414          }
415 
416         @Override
417         public void onStreamActive(Http2Stream stream) {
418             onStreamActive0(stream);
419         }
420 
421         @Override
422         public void onStreamClosed(Http2Stream stream) {
423             DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
424             if (stream2 != null) {
425                 onHttp2StreamStateChanged(ctx, stream2);
426             }
427         }
428 
429         @Override
430         public void onStreamHalfClosed(Http2Stream stream) {
431             DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
432             if (stream2 != null) {
433                 onHttp2StreamStateChanged(ctx, stream2);
434             }
435         }
436     }
437 
438     @Override
439     protected void onConnectionError(
440             ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex) {
441         if (!outbound) {
442             // allow the user to handle it first in the pipeline, and then automatically clean up.
443             // If this is not desired behavior the user can override this method.
444             //
445             // We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
446             ctx.fireExceptionCaught(cause);
447         }
448         super.onConnectionError(ctx, outbound, cause, http2Ex);
449     }
450 
451     /**
452      * Exceptions for unknown streams, that is streams that have no {@link Http2FrameStream} object attached
453      * are simply logged and replied to by sending a RST_STREAM frame.
454      */
455     @Override
456     protected final void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
457                                  Http2Exception.StreamException streamException) {
458         int streamId = streamException.streamId();
459         Http2Stream connectionStream = connection().stream(streamId);
460         if (connectionStream == null) {
461             onHttp2UnknownStreamError(ctx, cause, streamException);
462             // Write a RST_STREAM
463             super.onStreamError(ctx, outbound, cause, streamException);
464             return;
465         }
466 
467         Http2FrameStream stream = connectionStream.getProperty(streamKey);
468         if (stream == null) {
469             LOG.warn("Stream exception thrown without stream object attached.", cause);
470             // Write a RST_STREAM
471             super.onStreamError(ctx, outbound, cause, streamException);
472             return;
473         }
474 
475         if (!outbound) {
476             // We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
477             onHttp2FrameStreamException(ctx, new Http2FrameStreamException(stream, streamException.error(), cause));
478         }
479     }
480 
481     void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx, Throwable cause,
482                                    Http2Exception.StreamException streamException) {
483         // Just log....
484         LOG.warn("Stream exception thrown for unkown stream {}.", streamException.streamId(), cause);
485     }
486 
487     @Override
488     protected final boolean isGracefulShutdownComplete() {
489         return super.isGracefulShutdownComplete() && numBufferedStreams == 0;
490     }
491 
492     private final class FrameListener implements Http2FrameListener {
493 
494         @Override
495         public void onUnknownFrame(
496                 ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {
497             onHttp2Frame(ctx, new DefaultHttp2UnknownFrame(frameType, flags, payload)
498                     .stream(requireStream(streamId)).retain());
499         }
500 
501         @Override
502         public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
503             onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings));
504         }
505 
506         @Override
507         public void onPingRead(ChannelHandlerContext ctx, long data) {
508             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false));
509         }
510 
511         @Override
512         public void onPingAckRead(ChannelHandlerContext ctx, long data) {
513             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true));
514         }
515 
516         @Override
517         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
518             onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(requireStream(streamId)));
519         }
520 
521         @Override
522         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
523             if (streamId == 0) {
524                 // Ignore connection window updates.
525                 return;
526             }
527             onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(requireStream(streamId)));
528         }
529 
530         @Override
531         public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
532                                   Http2Headers headers, int streamDependency, short weight, boolean
533                                           exclusive, int padding, boolean endStream) {
534             onHeadersRead(ctx, streamId, headers, padding, endStream);
535         }
536 
537         @Override
538         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
539                                   int padding, boolean endOfStream) {
540             onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding)
541                                         .stream(requireStream(streamId)));
542         }
543 
544         @Override
545         public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
546                               boolean endOfStream) {
547             onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding)
548                                         .stream(requireStream(streamId)).retain());
549             // We return the bytes in consumeBytes() once the stream channel consumed the bytes.
550             return 0;
551         }
552 
553         @Override
554         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) {
555             onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData).retain());
556         }
557 
558         @Override
559         public void onPriorityRead(
560                 ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) {
561             // TODO: Maybe handle me
562         }
563 
564         @Override
565         public void onSettingsAckRead(ChannelHandlerContext ctx) {
566             // TODO: Maybe handle me
567         }
568 
569         @Override
570         public void onPushPromiseRead(
571                 ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding)  {
572             // TODO: Maybe handle me
573         }
574 
575         private Http2FrameStream requireStream(int streamId) {
576             Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey);
577             if (stream == null) {
578                 throw new IllegalStateException("Stream object required for identifier: " + streamId);
579             }
580             return stream;
581         }
582     }
583 
584     void onUpgradeEvent(ChannelHandlerContext ctx, UpgradeEvent evt) {
585         ctx.fireUserEventTriggered(evt);
586     }
587 
588     void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, Http2FrameStream stream,
589                                          @SuppressWarnings("unused") boolean writable) {
590         ctx.fireUserEventTriggered(Http2FrameStreamEvent.writabilityChanged(stream));
591     }
592 
593     void onHttp2StreamStateChanged(ChannelHandlerContext ctx, Http2FrameStream stream) {
594         ctx.fireUserEventTriggered(Http2FrameStreamEvent.stateChanged(stream));
595     }
596 
597     void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
598         ctx.fireChannelRead(frame);
599     }
600 
601     void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
602         ctx.fireExceptionCaught(cause);
603     }
604 
605     final boolean isWritable(DefaultHttp2FrameStream stream) {
606         Http2Stream s = stream.stream;
607         return s != null && connection().remote().flowController().isWritable(s);
608     }
609 
610     private final class Http2RemoteFlowControllerListener implements Http2RemoteFlowController.Listener {
611         @Override
612         public void writabilityChanged(Http2Stream stream) {
613             Http2FrameStream frameStream = stream.getProperty(streamKey);
614             if (frameStream == null) {
615                 return;
616             }
617             onHttp2StreamWritabilityChanged(
618                     ctx, frameStream, connection().remote().flowController().isWritable(stream));
619         }
620     }
621 
622     /**
623      * {@link Http2FrameStream} implementation.
624      */
625     // TODO(buchgr): Merge Http2FrameStream and Http2Stream.
626     static class DefaultHttp2FrameStream implements Http2FrameStream {
627 
628         private volatile int id = -1;
629         volatile Http2Stream stream;
630 
631         DefaultHttp2FrameStream setStreamAndProperty(PropertyKey streamKey, Http2Stream stream) {
632             assert id == -1 || stream.id() == id;
633             this.stream = stream;
634             stream.setProperty(streamKey, this);
635             return this;
636         }
637 
638         @Override
639         public int id() {
640             Http2Stream stream = this.stream;
641             return stream == null ? id : stream.id();
642         }
643 
644         @Override
645         public State state() {
646             Http2Stream stream = this.stream;
647             return stream == null ? State.IDLE : stream.state();
648         }
649 
650         @Override
651         public String toString() {
652             return String.valueOf(id());
653         }
654     }
655 }