View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandler;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.handler.codec.UnsupportedMessageTypeException;
26  import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
27  import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
28  import io.netty.handler.codec.http2.Http2Stream.State;
29  import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
30  import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
31  import io.netty.util.ReferenceCountUtil;
32  import io.netty.util.ReferenceCounted;
33  import io.netty.util.collection.IntObjectHashMap;
34  import io.netty.util.collection.IntObjectMap;
35  import io.netty.util.internal.logging.InternalLogger;
36  import io.netty.util.internal.logging.InternalLoggerFactory;
37  
38  import static io.netty.buffer.ByteBufUtil.writeAscii;
39  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
40  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
41  import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
42  import static io.netty.util.internal.logging.InternalLogLevel.DEBUG;
43  
44  /**
45   * <p>An HTTP/2 handler that maps HTTP/2 frames to {@link Http2Frame} objects and vice versa. For every incoming HTTP/2
46   * frame, an {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outbound {@link Http2Frame}
47   * objects received via {@link #write} are converted to the HTTP/2 wire format. HTTP/2 frames specific to a stream
48   * implement the {@link Http2StreamFrame} interface. The {@link Http2FrameCodec} is instantiated using the
49   * {@link Http2FrameCodecBuilder}. It's recommended for channel handlers to inherit from the
50   * {@link Http2ChannelDuplexHandler}, as it provides additional functionality like iterating over all active streams or
51   * creating outbound streams.
52   *
53   * <h3>Stream Lifecycle</h3>
54   * <p>
55   * The frame codec delivers and writes frames for active streams. An active stream is closed when either side sends a
56   * {@code RST_STREAM} frame or both sides send a frame with the {@code END_STREAM} flag set. Each
57   * {@link Http2StreamFrame} has a {@link Http2FrameStream} object attached that uniquely identifies a particular stream.
58   *
59   * <p>{@link Http2StreamFrame}s read from the channel always a {@link Http2FrameStream} object set, while when writing a
60   * {@link Http2StreamFrame} the application code needs to set a {@link Http2FrameStream} object using
61   * {@link Http2StreamFrame#stream(Http2FrameStream)}.
62   *
63   * <h3>Flow control</h3>
64   * <p>
65   * The frame codec automatically increments stream and connection flow control windows.
66   *
67   * <p>Incoming flow controlled frames need to be consumed by writing a {@link Http2WindowUpdateFrame} with the consumed
68   * number of bytes and the corresponding stream identifier set to the frame codec.
69   *
70   * <p>The local stream-level flow control window can be changed by writing a {@link Http2SettingsFrame} with the
71   * {@link Http2Settings#initialWindowSize()} set to the targeted value.
72   *
73   * <p>The connection-level flow control window can be changed by writing a {@link Http2WindowUpdateFrame} with the
74   * desired window size <em>increment</em> in bytes and the stream identifier set to {@code 0}. By default the initial
75   * connection-level flow control window is the same as initial stream-level flow control window.
76   *
77   * <h3>New inbound Streams</h3>
78   * <p>
79   * The first frame of an HTTP/2 stream must be an {@link Http2HeadersFrame}, which will have an {@link Http2FrameStream}
80   * object attached.
81   *
82   * <h3>New outbound Streams</h3>
83   * <p>
84   * A outbound HTTP/2 stream can be created by first instantiating a new {@link Http2FrameStream} object via
85   * {@link Http2ChannelDuplexHandler#newStream()}, and then writing a {@link Http2HeadersFrame} object with the stream
86   * attached.
87   *
88   * <pre> {@code
89   *     final Http2Stream2 stream = handler.newStream();
90   *     ctx.write(headersFrame.stream(stream)).addListener(new ChannelFutureListener() {
91   *
92   *         @Override
93   *         public void operationComplete(ChannelFuture f) {
94   *             if (f.isSuccess()) {
95   *                 // Stream is active and stream.id() returns a valid stream identifier.
96   *                 System.out.println("New stream with id " + stream.id() + " created.");
97   *             } else {
98   *                 // Stream failed to become active. Handle error.
99   *                 if (f.cause() instanceof Http2NoMoreStreamIdsException) {
100  *
101  *                 } else if (f.cause() instanceof Http2GoAwayException) {
102  *
103  *                 } else {
104  *
105  *                 }
106  *             }
107  *         }
108  *     }
109  *     }
110  * </pre>
111  *
112  * <p>If a new stream cannot be created due to stream id exhaustion of the endpoint, the {@link ChannelPromise} of the
113  * HEADERS frame will fail with a {@link Http2NoMoreStreamIdsException}.
114  *
115  * <p>The HTTP/2 standard allows for an endpoint to limit the maximum number of concurrently active streams via the
116  * {@code SETTINGS_MAX_CONCURRENT_STREAMS} setting. When this limit is reached, no new streams can be created. However,
117  * the {@link Http2FrameCodec} can be build with
118  * {@link Http2FrameCodecBuilder#encoderEnforceMaxConcurrentStreams(boolean)} enabled, in which case a new stream and
119  * its associated frames will be buffered until either the limit is increased or an active stream is closed. It's,
120  * however, possible that a buffered stream will never become active. That is, the channel might
121  * get closed or a GO_AWAY frame might be received. In the first case, all writes of buffered streams will fail with a
122  * {@link Http2ChannelClosedException}. In the second case, all writes of buffered streams with an identifier less than
123  * the last stream identifier of the GO_AWAY frame will fail with a {@link Http2GoAwayException}.
124  *
125  * <h3>Error Handling</h3>
126  * <p>
127  * Exceptions and errors are propagated via {@link ChannelInboundHandler#exceptionCaught}. Exceptions that apply to
128  * a specific HTTP/2 stream are wrapped in a {@link Http2FrameStreamException} and have the corresponding
129  * {@link Http2FrameStream} object attached.
130  *
131  * <h3>Reference Counting</h3>
132  * <p>
133  * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
134  * reference counted objects (e.g. {@link ByteBuf}s). The frame codec will call {@link ReferenceCounted#retain()} before
135  * propagating a reference counted object through the pipeline, and thus an application handler needs to release such
136  * an object after having consumed it. For more information on reference counting take a look at
137  * <a href="https://netty.io/wiki/reference-counted-objects.html">Reference counted objects</a>
138  *
139  * <h3>HTTP Upgrade</h3>
140  * <p>
141  * Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link Http2ServerUpgradeCodec}; the necessary
142  * HTTP-to-HTTP/2 conversion is performed automatically.
143  */
144 public class Http2FrameCodec extends Http2ConnectionHandler {
145 
146     private static final InternalLogger LOG = InternalLoggerFactory.getInstance(Http2FrameCodec.class);
147 
148     private static final Class<?>[] SUPPORTED_MESSAGES = new Class[] {
149             Http2DataFrame.class, Http2HeadersFrame.class, Http2WindowUpdateFrame.class, Http2ResetFrame.class,
150             Http2PingFrame.class, Http2SettingsFrame.class, Http2SettingsAckFrame.class, Http2GoAwayFrame.class,
151             Http2PushPromiseFrame.class, Http2PriorityFrame.class, Http2UnknownFrame.class };
152 
153     protected final PropertyKey streamKey;
154     private final PropertyKey upgradeKey;
155 
156     private final Integer initialFlowControlWindowSize;
157 
158     ChannelHandlerContext ctx;
159 
160     /**
161      * Number of buffered streams if the {@link StreamBufferingEncoder} is used.
162      **/
163     private int numBufferedStreams;
164     private final IntObjectMap<DefaultHttp2FrameStream> frameStreamToInitializeMap =
165             new IntObjectHashMap<DefaultHttp2FrameStream>(8);
166 
167     protected Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder,
168                               Http2Settings initialSettings, boolean decoupleCloseAndGoAway, boolean flushPreface) {
169         super(decoder, encoder, initialSettings, decoupleCloseAndGoAway, flushPreface);
170 
171         decoder.frameListener(new FrameListener());
172         connection().addListener(new ConnectionListener());
173         connection().remote().flowController().listener(new Http2RemoteFlowControllerListener());
174         streamKey = connection().newKey();
175         upgradeKey = connection().newKey();
176         initialFlowControlWindowSize = initialSettings.initialWindowSize();
177     }
178 
179     /**
180      * Creates a new outbound/local stream.
181      */
182     DefaultHttp2FrameStream newStream() {
183         return new DefaultHttp2FrameStream();
184     }
185 
186     /**
187      * Iterates over all active HTTP/2 streams.
188      *
189      * <p>This method must not be called outside of the event loop.
190      */
191     final void forEachActiveStream(final Http2FrameStreamVisitor streamVisitor) throws Http2Exception {
192         assert ctx.executor().inEventLoop();
193         if (connection().numActiveStreams() > 0) {
194             connection().forEachActiveStream(new Http2StreamVisitor() {
195                 @Override
196                 public boolean visit(Http2Stream stream) {
197                     try {
198                         return streamVisitor.visit((Http2FrameStream) stream.getProperty(streamKey));
199                     } catch (Throwable cause) {
200                         onError(ctx, false, cause);
201                         return false;
202                     }
203                 }
204             });
205         }
206     }
207 
208     /**
209      * Retrieve the number of streams currently in the process of being initialized.
210      * <p>
211      * This is package-private for testing only.
212      */
213     int numInitializingStreams() {
214         return frameStreamToInitializeMap.size();
215     }
216 
217     @Override
218     public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
219         this.ctx = ctx;
220         super.handlerAdded(ctx);
221         handlerAdded0(ctx);
222         // Must be after Http2ConnectionHandler does its initialization in handlerAdded above.
223         // The server will not send a connection preface so we are good to send a window update.
224         Http2Connection connection = connection();
225         if (connection.isServer()) {
226             tryExpandConnectionFlowControlWindow(connection);
227         }
228     }
229 
230     private void tryExpandConnectionFlowControlWindow(Http2Connection connection) throws Http2Exception {
231         if (initialFlowControlWindowSize != null) {
232             // The window size in the settings explicitly excludes the connection window. So we manually manipulate the
233             // connection window to accommodate more concurrent data per connection.
234             Http2Stream connectionStream = connection.connectionStream();
235             Http2LocalFlowController localFlowController = connection.local().flowController();
236             final int delta = initialFlowControlWindowSize - localFlowController.initialWindowSize(connectionStream);
237             // Only increase the connection window, don't decrease it.
238             if (delta > 0) {
239                 // Double the delta just so a single stream can't exhaust the connection window.
240                 localFlowController.incrementWindowSize(connectionStream, Math.max(delta << 1, delta));
241                 flush(ctx);
242             }
243         }
244     }
245 
246     void handlerAdded0(@SuppressWarnings("unsed") ChannelHandlerContext ctx) throws Exception {
247         // sub-class can override this for extra steps that needs to be done when the handler is added.
248     }
249 
250     /**
251      * Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via
252      * HTTP/2 on stream 1 (the stream specifically reserved for cleartext HTTP upgrade).
253      */
254     @Override
255     public final void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
256         if (evt == Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE) {
257             // The user event implies that we are on the client.
258             tryExpandConnectionFlowControlWindow(connection());
259 
260             // We schedule this on the EventExecutor to allow to have any extra handlers added to the pipeline
261             // before we pass the event to the next handler. This is needed as the event may be called from within
262             // handlerAdded(...) which will be run before other handlers will be added to the pipeline.
263             ctx.executor().execute(new Runnable() {
264                 @Override
265                 public void run() {
266                     ctx.fireUserEventTriggered(evt);
267                 }
268             });
269         } else if (evt instanceof UpgradeEvent) {
270             UpgradeEvent upgrade = (UpgradeEvent) evt;
271             try {
272                 onUpgradeEvent(ctx, upgrade.retain());
273                 Http2Stream stream = connection().stream(HTTP_UPGRADE_STREAM_ID);
274                 if (stream.getProperty(streamKey) == null) {
275                     // TODO: improve handler/stream lifecycle so that stream isn't active before handler added.
276                     // The stream was already made active, but ctx may have been null so it wasn't initialized.
277                     // https://github.com/netty/netty/issues/4942
278                     onStreamActive0(stream);
279                 }
280                 upgrade.upgradeRequest().headers().setInt(
281                         HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), HTTP_UPGRADE_STREAM_ID);
282                 stream.setProperty(upgradeKey, true);
283                 InboundHttpToHttp2Adapter.handle(
284                         ctx, connection(), decoder().frameListener(), upgrade.upgradeRequest().retain());
285             } finally {
286                 upgrade.release();
287             }
288         } else {
289             onUserEventTriggered(ctx, evt);
290             ctx.fireUserEventTriggered(evt);
291         }
292     }
293 
294     void onUserEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
295         // noop
296     }
297 
298     /**
299      * Processes all {@link Http2Frame}s. {@link Http2StreamFrame}s may only originate in child
300      * streams.
301      */
302     @Override
303     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
304         if (msg instanceof Http2DataFrame) {
305             Http2DataFrame dataFrame = (Http2DataFrame) msg;
306             encoder().writeData(ctx, dataFrame.stream().id(), dataFrame.content(),
307                     dataFrame.padding(), dataFrame.isEndStream(), promise);
308         } else if (msg instanceof Http2HeadersFrame) {
309             writeHeadersFrame(ctx, (Http2HeadersFrame) msg, promise);
310         } else if (msg instanceof Http2WindowUpdateFrame) {
311             Http2WindowUpdateFrame frame = (Http2WindowUpdateFrame) msg;
312             Http2FrameStream frameStream = frame.stream();
313             // It is legit to send a WINDOW_UPDATE frame for the connection stream. The parent channel doesn't attempt
314             // to set the Http2FrameStream so we assume if it is null the WINDOW_UPDATE is for the connection stream.
315             try {
316                 if (frameStream == null) {
317                     increaseInitialConnectionWindow(frame.windowSizeIncrement());
318                 } else {
319                     consumeBytes(frameStream.id(), frame.windowSizeIncrement());
320                 }
321                 promise.setSuccess();
322             } catch (Throwable t) {
323                 promise.setFailure(t);
324             }
325         } else if (msg instanceof Http2ResetFrame) {
326             Http2ResetFrame rstFrame = (Http2ResetFrame) msg;
327             int id = rstFrame.stream().id();
328             // Only ever send a reset frame if stream may have existed before as otherwise we may send a RST on a
329             // stream in an invalid state and cause a connection error.
330             if (connection().streamMayHaveExisted(id)) {
331                 encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode(), promise);
332             } else {
333                 ReferenceCountUtil.release(rstFrame);
334                 promise.setFailure(Http2Exception.streamError(
335                         rstFrame.stream().id(), Http2Error.PROTOCOL_ERROR, "Stream never existed"));
336             }
337         } else if (msg instanceof Http2PingFrame) {
338             Http2PingFrame frame = (Http2PingFrame) msg;
339             encoder().writePing(ctx, frame.ack(), frame.content(), promise);
340         } else if (msg instanceof Http2SettingsFrame) {
341             encoder().writeSettings(ctx, ((Http2SettingsFrame) msg).settings(), promise);
342         } else if (msg instanceof Http2SettingsAckFrame) {
343             // In the event of manual SETTINGS ACK, it is assumed the encoder will apply the earliest received but not
344             // yet ACKed settings.
345             encoder().writeSettingsAck(ctx, promise);
346         } else if (msg instanceof Http2GoAwayFrame) {
347             writeGoAwayFrame(ctx, (Http2GoAwayFrame) msg, promise);
348         } else if (msg instanceof Http2PushPromiseFrame) {
349             Http2PushPromiseFrame pushPromiseFrame = (Http2PushPromiseFrame) msg;
350             writePushPromise(ctx, pushPromiseFrame, promise);
351         } else if (msg instanceof Http2PriorityFrame) {
352             Http2PriorityFrame priorityFrame = (Http2PriorityFrame) msg;
353             encoder().writePriority(ctx, priorityFrame.stream().id(), priorityFrame.streamDependency(),
354                     priorityFrame.weight(), priorityFrame.exclusive(), promise);
355         } else if (msg instanceof Http2UnknownFrame) {
356             Http2UnknownFrame unknownFrame = (Http2UnknownFrame) msg;
357             encoder().writeFrame(ctx, unknownFrame.frameType(), unknownFrame.stream().id(),
358                     unknownFrame.flags(), unknownFrame.content(), promise);
359         } else if (!(msg instanceof Http2Frame)) {
360             ctx.write(msg, promise);
361         } else {
362             ReferenceCountUtil.release(msg);
363             throw new UnsupportedMessageTypeException(msg, SUPPORTED_MESSAGES);
364         }
365     }
366 
367     private void increaseInitialConnectionWindow(int deltaBytes) throws Http2Exception {
368         // The LocalFlowController is responsible for detecting over/under flow.
369         connection().local().flowController().incrementWindowSize(connection().connectionStream(), deltaBytes);
370     }
371 
372     final boolean consumeBytes(int streamId, int bytes) throws Http2Exception {
373         Http2Stream stream = connection().stream(streamId);
374         // Upgraded requests are ineligible for stream control. We add the null check
375         // in case the stream has been deregistered.
376         if (stream != null && streamId == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
377             Boolean upgraded = stream.getProperty(upgradeKey);
378             if (Boolean.TRUE.equals(upgraded)) {
379                 return false;
380             }
381         }
382 
383         return connection().local().flowController().consumeBytes(stream, bytes);
384     }
385 
386     private void writeGoAwayFrame(ChannelHandlerContext ctx, Http2GoAwayFrame frame, ChannelPromise promise) {
387         if (frame.lastStreamId() > -1) {
388             frame.release();
389             throw new IllegalArgumentException("Last stream id must not be set on GOAWAY frame");
390         }
391 
392         int lastStreamCreated = connection().remote().lastStreamCreated();
393         long lastStreamId = lastStreamCreated + ((long) frame.extraStreamIds()) * 2;
394         // Check if the computation overflowed.
395         if (lastStreamId > Integer.MAX_VALUE) {
396             lastStreamId = Integer.MAX_VALUE;
397         }
398         goAway(ctx, (int) lastStreamId, frame.errorCode(), frame.content(), promise);
399     }
400 
401     private void writeHeadersFrame(final ChannelHandlerContext ctx, Http2HeadersFrame headersFrame,
402                                    ChannelPromise promise) {
403 
404         if (isStreamIdValid(headersFrame.stream().id())) {
405             encoder().writeHeaders(ctx, headersFrame.stream().id(), headersFrame.headers(), headersFrame.padding(),
406                     headersFrame.isEndStream(), promise);
407         } else if (initializeNewStream(ctx, (DefaultHttp2FrameStream) headersFrame.stream(), promise)) {
408             promise = promise.unvoid();
409 
410             final int streamId = headersFrame.stream().id();
411 
412             encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
413                     headersFrame.isEndStream(), promise);
414 
415             if (!promise.isDone()) {
416                 numBufferedStreams++;
417                 // Clean up the stream being initialized if writing the headers fails and also
418                 // decrement the number of buffered streams.
419                 promise.addListener(new ChannelFutureListener() {
420                     @Override
421                     public void operationComplete(ChannelFuture channelFuture) {
422                         numBufferedStreams--;
423                         handleHeaderFuture(channelFuture, streamId);
424                     }
425                 });
426             } else {
427                 handleHeaderFuture(promise, streamId);
428             }
429         }
430     }
431 
432     private void writePushPromise(final ChannelHandlerContext ctx, Http2PushPromiseFrame pushPromiseFrame,
433                                   final ChannelPromise promise) {
434         if (isStreamIdValid(pushPromiseFrame.pushStream().id())) {
435             encoder().writePushPromise(ctx, pushPromiseFrame.stream().id(), pushPromiseFrame.pushStream().id(),
436                     pushPromiseFrame.http2Headers(), pushPromiseFrame.padding(), promise);
437         } else if (initializeNewStream(ctx, (DefaultHttp2FrameStream) pushPromiseFrame.pushStream(), promise)) {
438             final int streamId = pushPromiseFrame.stream().id();
439             encoder().writePushPromise(ctx, streamId, pushPromiseFrame.pushStream().id(),
440                     pushPromiseFrame.http2Headers(), pushPromiseFrame.padding(), promise);
441 
442             if (promise.isDone()) {
443                 handleHeaderFuture(promise, streamId);
444             } else {
445                 numBufferedStreams++;
446                 // Clean up the stream being initialized if writing the headers fails and also
447                 // decrement the number of buffered streams.
448                 promise.addListener(new ChannelFutureListener() {
449                     @Override
450                     public void operationComplete(ChannelFuture channelFuture) {
451                         numBufferedStreams--;
452                         handleHeaderFuture(channelFuture, streamId);
453                     }
454                 });
455             }
456         }
457     }
458 
459     private boolean initializeNewStream(ChannelHandlerContext ctx, DefaultHttp2FrameStream http2FrameStream,
460                                         ChannelPromise promise) {
461         final Http2Connection connection = connection();
462         final int streamId = connection.local().incrementAndGetNextStreamId();
463         if (streamId < 0) {
464             promise.setFailure(new Http2NoMoreStreamIdsException());
465 
466             // Simulate a GOAWAY being received due to stream exhaustion on this connection. We use the maximum
467             // valid stream ID for the current peer.
468             onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(connection.isServer() ? Integer.MAX_VALUE :
469                     Integer.MAX_VALUE - 1, NO_ERROR.code(),
470                     writeAscii(ctx.alloc(), "Stream IDs exhausted on local stream creation")));
471 
472             return false;
473         }
474         http2FrameStream.id = streamId;
475 
476         // Use a Map to store all pending streams as we may have multiple. This is needed as if we would store the
477         // stream in a field directly we may override the stored field before onStreamAdded(...) was called
478         // and so not correctly set the property for the buffered stream.
479         //
480         // See https://github.com/netty/netty/issues/8692
481         Object old = frameStreamToInitializeMap.put(streamId, http2FrameStream);
482 
483         // We should not re-use ids.
484         assert old == null;
485         return true;
486     }
487 
488     private void handleHeaderFuture(ChannelFuture channelFuture, int streamId) {
489         if (!channelFuture.isSuccess()) {
490             frameStreamToInitializeMap.remove(streamId);
491         }
492     }
493 
494     private void onStreamActive0(Http2Stream stream) {
495         if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID &&
496                 connection().local().isValidStreamId(stream.id())) {
497             return;
498         }
499 
500         DefaultHttp2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
501         onHttp2StreamStateChanged(ctx, stream2);
502     }
503 
504     private final class ConnectionListener extends Http2ConnectionAdapter {
505         @Override
506         public void onStreamAdded(Http2Stream stream) {
507             DefaultHttp2FrameStream frameStream = frameStreamToInitializeMap.remove(stream.id());
508 
509             if (frameStream != null) {
510                 frameStream.setStreamAndProperty(streamKey, stream);
511             }
512         }
513 
514         @Override
515         public void onStreamActive(Http2Stream stream) {
516             onStreamActive0(stream);
517         }
518 
519         @Override
520         public void onStreamClosed(Http2Stream stream) {
521             onHttp2StreamStateChanged0(stream);
522         }
523 
524         @Override
525         public void onStreamHalfClosed(Http2Stream stream) {
526             onHttp2StreamStateChanged0(stream);
527         }
528 
529         private void onHttp2StreamStateChanged0(Http2Stream stream) {
530             DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
531             if (stream2 != null) {
532                 onHttp2StreamStateChanged(ctx, stream2);
533             }
534         }
535     }
536 
537     @Override
538     protected void onConnectionError(
539             ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex) {
540         if (!outbound) {
541             // allow the user to handle it first in the pipeline, and then automatically clean up.
542             // If this is not desired behavior the user can override this method.
543             //
544             // We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
545             ctx.fireExceptionCaught(cause);
546         }
547         super.onConnectionError(ctx, outbound, cause, http2Ex);
548     }
549 
550     /**
551      * Exceptions for unknown streams, that is streams that have no {@link Http2FrameStream} object attached
552      * are simply logged and replied to by sending a RST_STREAM frame.
553      */
554     @Override
555     protected final void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
556                                        Http2Exception.StreamException streamException) {
557         int streamId = streamException.streamId();
558         Http2Stream connectionStream = connection().stream(streamId);
559         if (connectionStream == null) {
560             onHttp2UnknownStreamError(ctx, cause, streamException);
561             // Write a RST_STREAM
562             super.onStreamError(ctx, outbound, cause, streamException);
563             return;
564         }
565 
566         Http2FrameStream stream = connectionStream.getProperty(streamKey);
567         if (stream == null) {
568             LOG.warn("{} Stream exception thrown without stream object attached.", ctx.channel(), cause);
569             // Write a RST_STREAM
570             super.onStreamError(ctx, outbound, cause, streamException);
571             return;
572         }
573 
574         if (!outbound) {
575             // We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
576             onHttp2FrameStreamException(ctx, new Http2FrameStreamException(stream, streamException.error(), cause));
577         }
578     }
579 
580     private static void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx,
581             Throwable cause, Http2Exception.StreamException streamException) {
582         // We log here for debugging purposes. This exception will be propagated to the upper layers through other ways:
583         // - fireExceptionCaught
584         // - fireUserEventTriggered(Http2ResetFrame), see Http2MultiplexHandler#channelRead(...)
585         // - by failing write promise
586         // Receiver of the error is responsible for correct handling of this exception.
587         LOG.log(DEBUG, "{} Stream exception thrown for unknown stream {}.",
588                 ctx.channel(), streamException.streamId(), cause);
589     }
590 
591     @Override
592     protected final boolean isGracefulShutdownComplete() {
593         return super.isGracefulShutdownComplete() && numBufferedStreams == 0;
594     }
595 
596     private final class FrameListener implements Http2FrameListener {
597 
598         @Override
599         public void onUnknownFrame(
600                 ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {
601             if (streamId == 0) {
602                 // Ignore unknown frames on connection stream, for example: HTTP/2 GREASE testing
603                 return;
604             }
605             Http2FrameStream stream = requireStream(streamId);
606             onHttp2Frame(ctx, newHttp2UnknownFrame(frameType, streamId, flags, payload.retain()).stream(stream));
607         }
608 
609         @Override
610         public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
611             onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings));
612         }
613 
614         @Override
615         public void onPingRead(ChannelHandlerContext ctx, long data) {
616             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false));
617         }
618 
619         @Override
620         public void onPingAckRead(ChannelHandlerContext ctx, long data) {
621             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true));
622         }
623 
624         @Override
625         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
626             Http2FrameStream stream = requireStream(streamId);
627             onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(stream));
628         }
629 
630         @Override
631         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
632             if (streamId == 0) {
633                 // Ignore connection window updates.
634                 return;
635             }
636             Http2FrameStream stream = requireStream(streamId);
637             onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(stream));
638         }
639 
640         @Override
641         public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
642                                   Http2Headers headers, int streamDependency, short weight, boolean
643                                           exclusive, int padding, boolean endStream) {
644             onHeadersRead(ctx, streamId, headers, padding, endStream);
645         }
646 
647         @Override
648         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
649                                   int padding, boolean endOfStream) {
650             Http2FrameStream stream = requireStream(streamId);
651             onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding).stream(stream));
652         }
653 
654         @Override
655         public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
656                               boolean endOfStream) {
657             Http2FrameStream stream = requireStream(streamId);
658             final Http2DataFrame dataframe;
659             try {
660                 dataframe = new DefaultHttp2DataFrame(data.retain(), endOfStream, padding);
661             } catch (IllegalArgumentException e) {
662                 // Might be thrown in case of invalid padding / length.
663                 data.release();
664                 throw e;
665             }
666             dataframe.stream(stream);
667             onHttp2Frame(ctx, dataframe);
668             // We return the bytes in consumeBytes() once the stream channel consumed the bytes.
669             return 0;
670         }
671 
672         @Override
673         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) {
674             onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData.retain()));
675         }
676 
677         @Override
678         public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
679                                    short weight, boolean exclusive) {
680 
681             Http2Stream stream = connection().stream(streamId);
682             if (stream == null) {
683                 // The stream was not opened yet, let's just ignore this for now.
684                 return;
685             }
686             Http2FrameStream frameStream = requireStream(streamId);
687             onHttp2Frame(ctx, new DefaultHttp2PriorityFrame(streamDependency, weight, exclusive)
688                     .stream(frameStream));
689         }
690 
691         @Override
692         public void onSettingsAckRead(ChannelHandlerContext ctx) {
693             onHttp2Frame(ctx, Http2SettingsAckFrame.INSTANCE);
694         }
695 
696         @Override
697         public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
698                                       Http2Headers headers, int padding) {
699             Http2FrameStream stream = requireStream(streamId);
700             onHttp2Frame(ctx, new DefaultHttp2PushPromiseFrame(headers, padding, promisedStreamId)
701                     .pushStream(new DefaultHttp2FrameStream()
702                             .setStreamAndProperty(streamKey, connection().stream(promisedStreamId)))
703                     .stream(stream));
704         }
705 
706         private Http2FrameStream requireStream(int streamId) {
707             Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey);
708             if (stream == null) {
709                 throw new IllegalStateException("Stream object required for identifier: " + streamId);
710             }
711             return stream;
712         }
713     }
714 
715     private void onUpgradeEvent(ChannelHandlerContext ctx, UpgradeEvent evt) {
716         ctx.fireUserEventTriggered(evt);
717     }
718 
719     private void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream,
720                                                  @SuppressWarnings("unused") boolean writable) {
721         ctx.fireUserEventTriggered(stream.writabilityChanged);
722     }
723 
724     void onHttp2StreamStateChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
725         ctx.fireUserEventTriggered(stream.stateChanged);
726     }
727 
728     void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
729         ctx.fireChannelRead(frame);
730     }
731 
732     /**
733      * Create a Http2UnknownFrame. The ownership of the {@link ByteBuf} is transferred.
734      * */
735     protected Http2StreamFrame newHttp2UnknownFrame(byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {
736         return new DefaultHttp2UnknownFrame(frameType, flags, payload);
737     }
738 
739     void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
740         ctx.fireExceptionCaught(cause);
741     }
742 
743     private final class Http2RemoteFlowControllerListener implements Http2RemoteFlowController.Listener {
744         @Override
745         public void writabilityChanged(Http2Stream stream) {
746             DefaultHttp2FrameStream frameStream = stream.getProperty(streamKey);
747             if (frameStream == null) {
748                 return;
749             }
750             onHttp2StreamWritabilityChanged(
751                     ctx, frameStream, connection().remote().flowController().isWritable(stream));
752         }
753     }
754 
755     /**
756      * {@link Http2FrameStream} implementation.
757      */
758     // TODO(buchgr): Merge Http2FrameStream and Http2Stream.
759     static class DefaultHttp2FrameStream implements Http2FrameStream {
760 
761         private volatile int id = -1;
762         private volatile Http2Stream stream;
763 
764         final Http2FrameStreamEvent stateChanged = Http2FrameStreamEvent.stateChanged(this);
765         final Http2FrameStreamEvent writabilityChanged = Http2FrameStreamEvent.writabilityChanged(this);
766 
767         Channel attachment;
768 
769         DefaultHttp2FrameStream setStreamAndProperty(PropertyKey streamKey, Http2Stream stream) {
770             assert id == -1 || stream.id() == id;
771             this.stream = stream;
772             this.id = stream.id();
773             stream.setProperty(streamKey, this);
774             return this;
775         }
776 
777         @Override
778         public int id() {
779             Http2Stream stream = this.stream;
780             return stream == null ? id : stream.id();
781         }
782 
783         @Override
784         public State state() {
785             Http2Stream stream = this.stream;
786             return stream == null ? State.IDLE : stream.state();
787         }
788 
789         @Override
790         public String toString() {
791             return String.valueOf(id());
792         }
793     }
794 }