View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandler;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.handler.codec.UnsupportedMessageTypeException;
26  import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
27  import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
28  import io.netty.handler.codec.http2.Http2Stream.State;
29  import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
30  import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
31  import io.netty.util.ReferenceCountUtil;
32  import io.netty.util.ReferenceCounted;
33  import io.netty.util.collection.IntObjectHashMap;
34  import io.netty.util.collection.IntObjectMap;
35  import io.netty.util.internal.UnstableApi;
36  import io.netty.util.internal.logging.InternalLogger;
37  import io.netty.util.internal.logging.InternalLoggerFactory;
38  
39  import static io.netty.buffer.ByteBufUtil.writeAscii;
40  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
41  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
42  import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
43  import static io.netty.util.internal.logging.InternalLogLevel.DEBUG;
44  
45  /**
46   * <p><em>This API is very immature.</em> The Http2Connection-based API is currently preferred over this API.
47   * This API is targeted to eventually replace or reduce the need for the {@link Http2ConnectionHandler} API.
48   *
49   * <p>An HTTP/2 handler that maps HTTP/2 frames to {@link Http2Frame} objects and vice versa. For every incoming HTTP/2
50   * frame, an {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outbound {@link Http2Frame}
51   * objects received via {@link #write} are converted to the HTTP/2 wire format. HTTP/2 frames specific to a stream
52   * implement the {@link Http2StreamFrame} interface. The {@link Http2FrameCodec} is instantiated using the
53   * {@link Http2FrameCodecBuilder}. It's recommended for channel handlers to inherit from the
54   * {@link Http2ChannelDuplexHandler}, as it provides additional functionality like iterating over all active streams or
55   * creating outbound streams.
56   *
57   * <h3>Stream Lifecycle</h3>
58   * <p>
59   * The frame codec delivers and writes frames for active streams. An active stream is closed when either side sends a
60   * {@code RST_STREAM} frame or both sides send a frame with the {@code END_STREAM} flag set. Each
61   * {@link Http2StreamFrame} has a {@link Http2FrameStream} object attached that uniquely identifies a particular stream.
62   *
63   * <p>{@link Http2StreamFrame}s read from the channel always a {@link Http2FrameStream} object set, while when writing a
64   * {@link Http2StreamFrame} the application code needs to set a {@link Http2FrameStream} object using
65   * {@link Http2StreamFrame#stream(Http2FrameStream)}.
66   *
67   * <h3>Flow control</h3>
68   * <p>
69   * The frame codec automatically increments stream and connection flow control windows.
70   *
71   * <p>Incoming flow controlled frames need to be consumed by writing a {@link Http2WindowUpdateFrame} with the consumed
72   * number of bytes and the corresponding stream identifier set to the frame codec.
73   *
74   * <p>The local stream-level flow control window can be changed by writing a {@link Http2SettingsFrame} with the
75   * {@link Http2Settings#initialWindowSize()} set to the targeted value.
76   *
77   * <p>The connection-level flow control window can be changed by writing a {@link Http2WindowUpdateFrame} with the
78   * desired window size <em>increment</em> in bytes and the stream identifier set to {@code 0}. By default the initial
79   * connection-level flow control window is the same as initial stream-level flow control window.
80   *
81   * <h3>New inbound Streams</h3>
82   * <p>
83   * The first frame of an HTTP/2 stream must be an {@link Http2HeadersFrame}, which will have an {@link Http2FrameStream}
84   * object attached.
85   *
86   * <h3>New outbound Streams</h3>
87   * <p>
88   * A outbound HTTP/2 stream can be created by first instantiating a new {@link Http2FrameStream} object via
89   * {@link Http2ChannelDuplexHandler#newStream()}, and then writing a {@link Http2HeadersFrame} object with the stream
90   * attached.
91   *
92   * <pre>
93   *     final Http2Stream2 stream = handler.newStream();
94   *     ctx.write(headersFrame.stream(stream)).addListener(new ChannelFutureListener() {
95   *
96   *         @Override
97   *         public void operationComplete(ChannelFuture f) {
98   *             if (f.isSuccess()) {
99   *                 // Stream is active and stream.id() returns a valid stream identifier.
100  *                 System.out.println("New stream with id " + stream.id() + " created.");
101  *             } else {
102  *                 // Stream failed to become active. Handle error.
103  *                 if (f.cause() instanceof Http2NoMoreStreamIdsException) {
104  *
105  *                 } else if (f.cause() instanceof Http2GoAwayException) {
106  *
107  *                 } else {
108  *
109  *                 }
110  *             }
111  *         }
112  *     }
113  * </pre>
114  *
115  * <p>If a new stream cannot be created due to stream id exhaustion of the endpoint, the {@link ChannelPromise} of the
116  * HEADERS frame will fail with a {@link Http2NoMoreStreamIdsException}.
117  *
118  * <p>The HTTP/2 standard allows for an endpoint to limit the maximum number of concurrently active streams via the
119  * {@code SETTINGS_MAX_CONCURRENT_STREAMS} setting. When this limit is reached, no new streams can be created. However,
120  * the {@link Http2FrameCodec} can be build with
121  * {@link Http2FrameCodecBuilder#encoderEnforceMaxConcurrentStreams(boolean)} enabled, in which case a new stream and
122  * its associated frames will be buffered until either the limit is increased or an active stream is closed. It's,
123  * however, possible that a buffered stream will never become active. That is, the channel might
124  * get closed or a GO_AWAY frame might be received. In the first case, all writes of buffered streams will fail with a
125  * {@link Http2ChannelClosedException}. In the second case, all writes of buffered streams with an identifier less than
126  * the last stream identifier of the GO_AWAY frame will fail with a {@link Http2GoAwayException}.
127  *
128  * <h3>Error Handling</h3>
129  * <p>
130  * Exceptions and errors are propagated via {@link ChannelInboundHandler#exceptionCaught}. Exceptions that apply to
131  * a specific HTTP/2 stream are wrapped in a {@link Http2FrameStreamException} and have the corresponding
132  * {@link Http2FrameStream} object attached.
133  *
134  * <h3>Reference Counting</h3>
135  * <p>
136  * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
137  * reference counted objects (e.g. {@link ByteBuf}s). The frame codec will call {@link ReferenceCounted#retain()} before
138  * propagating a reference counted object through the pipeline, and thus an application handler needs to release such
139  * an object after having consumed it. For more information on reference counting take a look at
140  * https://netty.io/wiki/reference-counted-objects.html
141  *
142  * <h3>HTTP Upgrade</h3>
143  * <p>
144  * Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link Http2ServerUpgradeCodec}; the necessary
145  * HTTP-to-HTTP/2 conversion is performed automatically.
146  */
147 @UnstableApi
148 public class Http2FrameCodec extends Http2ConnectionHandler {
149 
150     private static final InternalLogger LOG = InternalLoggerFactory.getInstance(Http2FrameCodec.class);
151 
152     protected final PropertyKey streamKey;
153     private final PropertyKey upgradeKey;
154 
155     private final Integer initialFlowControlWindowSize;
156 
157     ChannelHandlerContext ctx;
158 
159     /**
160      * Number of buffered streams if the {@link StreamBufferingEncoder} is used.
161      **/
162     private int numBufferedStreams;
163     private final IntObjectMap<DefaultHttp2FrameStream> frameStreamToInitializeMap =
164             new IntObjectHashMap<DefaultHttp2FrameStream>(8);
165 
166     Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings,
167                     boolean decoupleCloseAndGoAway, boolean flushPreface) {
168         super(decoder, encoder, initialSettings, decoupleCloseAndGoAway, flushPreface);
169 
170         decoder.frameListener(new FrameListener());
171         connection().addListener(new ConnectionListener());
172         connection().remote().flowController().listener(new Http2RemoteFlowControllerListener());
173         streamKey = connection().newKey();
174         upgradeKey = connection().newKey();
175         initialFlowControlWindowSize = initialSettings.initialWindowSize();
176     }
177 
178     /**
179      * Creates a new outbound/local stream.
180      */
181     DefaultHttp2FrameStream newStream() {
182         return new DefaultHttp2FrameStream();
183     }
184 
185     /**
186      * Iterates over all active HTTP/2 streams.
187      *
188      * <p>This method must not be called outside of the event loop.
189      */
190     final void forEachActiveStream(final Http2FrameStreamVisitor streamVisitor) throws Http2Exception {
191         assert ctx.executor().inEventLoop();
192         if (connection().numActiveStreams() > 0) {
193             connection().forEachActiveStream(new Http2StreamVisitor() {
194                 @Override
195                 public boolean visit(Http2Stream stream) {
196                     try {
197                         return streamVisitor.visit((Http2FrameStream) stream.getProperty(streamKey));
198                     } catch (Throwable cause) {
199                         onError(ctx, false, cause);
200                         return false;
201                     }
202                 }
203             });
204         }
205     }
206 
207     /**
208      * Retrieve the number of streams currently in the process of being initialized.
209      * <p>
210      * This is package-private for testing only.
211      */
212     int numInitializingStreams() {
213         return frameStreamToInitializeMap.size();
214     }
215 
216     @Override
217     public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
218         this.ctx = ctx;
219         super.handlerAdded(ctx);
220         handlerAdded0(ctx);
221         // Must be after Http2ConnectionHandler does its initialization in handlerAdded above.
222         // The server will not send a connection preface so we are good to send a window update.
223         Http2Connection connection = connection();
224         if (connection.isServer()) {
225             tryExpandConnectionFlowControlWindow(connection);
226         }
227     }
228 
229     private void tryExpandConnectionFlowControlWindow(Http2Connection connection) throws Http2Exception {
230         if (initialFlowControlWindowSize != null) {
231             // The window size in the settings explicitly excludes the connection window. So we manually manipulate the
232             // connection window to accommodate more concurrent data per connection.
233             Http2Stream connectionStream = connection.connectionStream();
234             Http2LocalFlowController localFlowController = connection.local().flowController();
235             final int delta = initialFlowControlWindowSize - localFlowController.initialWindowSize(connectionStream);
236             // Only increase the connection window, don't decrease it.
237             if (delta > 0) {
238                 // Double the delta just so a single stream can't exhaust the connection window.
239                 localFlowController.incrementWindowSize(connectionStream, Math.max(delta << 1, delta));
240                 flush(ctx);
241             }
242         }
243     }
244 
245     void handlerAdded0(@SuppressWarnings("unsed") ChannelHandlerContext ctx) throws Exception {
246         // sub-class can override this for extra steps that needs to be done when the handler is added.
247     }
248 
249     /**
250      * Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via
251      * HTTP/2 on stream 1 (the stream specifically reserved for cleartext HTTP upgrade).
252      */
253     @Override
254     public final void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
255         if (evt == Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE) {
256             // The user event implies that we are on the client.
257             tryExpandConnectionFlowControlWindow(connection());
258 
259             // We schedule this on the EventExecutor to allow to have any extra handlers added to the pipeline
260             // before we pass the event to the next handler. This is needed as the event may be called from within
261             // handlerAdded(...) which will be run before other handlers will be added to the pipeline.
262             ctx.executor().execute(new Runnable() {
263                 @Override
264                 public void run() {
265                     ctx.fireUserEventTriggered(evt);
266                 }
267             });
268         } else if (evt instanceof UpgradeEvent) {
269             UpgradeEvent upgrade = (UpgradeEvent) evt;
270             try {
271                 onUpgradeEvent(ctx, upgrade.retain());
272                 Http2Stream stream = connection().stream(HTTP_UPGRADE_STREAM_ID);
273                 if (stream.getProperty(streamKey) == null) {
274                     // TODO: improve handler/stream lifecycle so that stream isn't active before handler added.
275                     // The stream was already made active, but ctx may have been null so it wasn't initialized.
276                     // https://github.com/netty/netty/issues/4942
277                     onStreamActive0(stream);
278                 }
279                 upgrade.upgradeRequest().headers().setInt(
280                         HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), HTTP_UPGRADE_STREAM_ID);
281                 stream.setProperty(upgradeKey, true);
282                 InboundHttpToHttp2Adapter.handle(
283                         ctx, connection(), decoder().frameListener(), upgrade.upgradeRequest().retain());
284             } finally {
285                 upgrade.release();
286             }
287         } else {
288             ctx.fireUserEventTriggered(evt);
289         }
290     }
291 
292     /**
293      * Processes all {@link Http2Frame}s. {@link Http2StreamFrame}s may only originate in child
294      * streams.
295      */
296     @Override
297     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
298         if (msg instanceof Http2DataFrame) {
299             Http2DataFrame dataFrame = (Http2DataFrame) msg;
300             encoder().writeData(ctx, dataFrame.stream().id(), dataFrame.content(),
301                     dataFrame.padding(), dataFrame.isEndStream(), promise);
302         } else if (msg instanceof Http2HeadersFrame) {
303             writeHeadersFrame(ctx, (Http2HeadersFrame) msg, promise);
304         } else if (msg instanceof Http2WindowUpdateFrame) {
305             Http2WindowUpdateFrame frame = (Http2WindowUpdateFrame) msg;
306             Http2FrameStream frameStream = frame.stream();
307             // It is legit to send a WINDOW_UPDATE frame for the connection stream. The parent channel doesn't attempt
308             // to set the Http2FrameStream so we assume if it is null the WINDOW_UPDATE is for the connection stream.
309             try {
310                 if (frameStream == null) {
311                     increaseInitialConnectionWindow(frame.windowSizeIncrement());
312                 } else {
313                     consumeBytes(frameStream.id(), frame.windowSizeIncrement());
314                 }
315                 promise.setSuccess();
316             } catch (Throwable t) {
317                 promise.setFailure(t);
318             }
319         } else if (msg instanceof Http2ResetFrame) {
320             Http2ResetFrame rstFrame = (Http2ResetFrame) msg;
321             int id = rstFrame.stream().id();
322             // Only ever send a reset frame if stream may have existed before as otherwise we may send a RST on a
323             // stream in an invalid state and cause a connection error.
324             if (connection().streamMayHaveExisted(id)) {
325                 encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode(), promise);
326             } else {
327                 ReferenceCountUtil.release(rstFrame);
328                 promise.setFailure(Http2Exception.streamError(
329                         rstFrame.stream().id(), Http2Error.PROTOCOL_ERROR, "Stream never existed"));
330             }
331         } else if (msg instanceof Http2PingFrame) {
332             Http2PingFrame frame = (Http2PingFrame) msg;
333             encoder().writePing(ctx, frame.ack(), frame.content(), promise);
334         } else if (msg instanceof Http2SettingsFrame) {
335             encoder().writeSettings(ctx, ((Http2SettingsFrame) msg).settings(), promise);
336         } else if (msg instanceof Http2SettingsAckFrame) {
337             // In the event of manual SETTINGS ACK, it is assumed the encoder will apply the earliest received but not
338             // yet ACKed settings.
339             encoder().writeSettingsAck(ctx, promise);
340         } else if (msg instanceof Http2GoAwayFrame) {
341             writeGoAwayFrame(ctx, (Http2GoAwayFrame) msg, promise);
342         } else if (msg instanceof Http2PushPromiseFrame) {
343             Http2PushPromiseFrame pushPromiseFrame = (Http2PushPromiseFrame) msg;
344             writePushPromise(ctx, pushPromiseFrame, promise);
345         } else if (msg instanceof Http2PriorityFrame) {
346             Http2PriorityFrame priorityFrame = (Http2PriorityFrame) msg;
347             encoder().writePriority(ctx, priorityFrame.stream().id(), priorityFrame.streamDependency(),
348                     priorityFrame.weight(), priorityFrame.exclusive(), promise);
349         } else if (msg instanceof Http2UnknownFrame) {
350             Http2UnknownFrame unknownFrame = (Http2UnknownFrame) msg;
351             encoder().writeFrame(ctx, unknownFrame.frameType(), unknownFrame.stream().id(),
352                     unknownFrame.flags(), unknownFrame.content(), promise);
353         } else if (!(msg instanceof Http2Frame)) {
354             ctx.write(msg, promise);
355         } else {
356             ReferenceCountUtil.release(msg);
357             throw new UnsupportedMessageTypeException(msg);
358         }
359     }
360 
361     private void increaseInitialConnectionWindow(int deltaBytes) throws Http2Exception {
362         // The LocalFlowController is responsible for detecting over/under flow.
363         connection().local().flowController().incrementWindowSize(connection().connectionStream(), deltaBytes);
364     }
365 
366     final boolean consumeBytes(int streamId, int bytes) throws Http2Exception {
367         Http2Stream stream = connection().stream(streamId);
368         // Upgraded requests are ineligible for stream control. We add the null check
369         // in case the stream has been deregistered.
370         if (stream != null && streamId == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
371             Boolean upgraded = stream.getProperty(upgradeKey);
372             if (Boolean.TRUE.equals(upgraded)) {
373                 return false;
374             }
375         }
376 
377         return connection().local().flowController().consumeBytes(stream, bytes);
378     }
379 
380     private void writeGoAwayFrame(ChannelHandlerContext ctx, Http2GoAwayFrame frame, ChannelPromise promise) {
381         if (frame.lastStreamId() > -1) {
382             frame.release();
383             throw new IllegalArgumentException("Last stream id must not be set on GOAWAY frame");
384         }
385 
386         int lastStreamCreated = connection().remote().lastStreamCreated();
387         long lastStreamId = lastStreamCreated + ((long) frame.extraStreamIds()) * 2;
388         // Check if the computation overflowed.
389         if (lastStreamId > Integer.MAX_VALUE) {
390             lastStreamId = Integer.MAX_VALUE;
391         }
392         goAway(ctx, (int) lastStreamId, frame.errorCode(), frame.content(), promise);
393     }
394 
395     private void writeHeadersFrame(final ChannelHandlerContext ctx, Http2HeadersFrame headersFrame,
396                                    final ChannelPromise promise) {
397 
398         if (isStreamIdValid(headersFrame.stream().id())) {
399             encoder().writeHeaders(ctx, headersFrame.stream().id(), headersFrame.headers(), headersFrame.padding(),
400                     headersFrame.isEndStream(), promise);
401         } else if (initializeNewStream(ctx, (DefaultHttp2FrameStream) headersFrame.stream(), promise)) {
402             final int streamId = headersFrame.stream().id();
403 
404             encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
405                     headersFrame.isEndStream(), promise);
406 
407             if (!promise.isDone()) {
408                 numBufferedStreams++;
409                 // Clean up the stream being initialized if writing the headers fails and also
410                 // decrement the number of buffered streams.
411                 promise.addListener(new ChannelFutureListener() {
412                     @Override
413                     public void operationComplete(ChannelFuture channelFuture) {
414                         numBufferedStreams--;
415                         handleHeaderFuture(channelFuture, streamId);
416                     }
417                 });
418             } else {
419                 handleHeaderFuture(promise, streamId);
420             }
421         }
422     }
423 
424     private void writePushPromise(final ChannelHandlerContext ctx, Http2PushPromiseFrame pushPromiseFrame,
425                                   final ChannelPromise promise) {
426         if (isStreamIdValid(pushPromiseFrame.pushStream().id())) {
427             encoder().writePushPromise(ctx, pushPromiseFrame.stream().id(), pushPromiseFrame.pushStream().id(),
428                     pushPromiseFrame.http2Headers(), pushPromiseFrame.padding(), promise);
429         } else if (initializeNewStream(ctx, (DefaultHttp2FrameStream) pushPromiseFrame.pushStream(), promise)) {
430             final int streamId = pushPromiseFrame.stream().id();
431             encoder().writePushPromise(ctx, streamId, pushPromiseFrame.pushStream().id(),
432                     pushPromiseFrame.http2Headers(), pushPromiseFrame.padding(), promise);
433 
434             if (promise.isDone()) {
435                 handleHeaderFuture(promise, streamId);
436             } else {
437                 numBufferedStreams++;
438                 // Clean up the stream being initialized if writing the headers fails and also
439                 // decrement the number of buffered streams.
440                 promise.addListener(new ChannelFutureListener() {
441                     @Override
442                     public void operationComplete(ChannelFuture channelFuture) {
443                         numBufferedStreams--;
444                         handleHeaderFuture(channelFuture, streamId);
445                     }
446                 });
447             }
448         }
449     }
450 
451     private boolean initializeNewStream(ChannelHandlerContext ctx, DefaultHttp2FrameStream http2FrameStream,
452                                         ChannelPromise promise) {
453         final Http2Connection connection = connection();
454         final int streamId = connection.local().incrementAndGetNextStreamId();
455         if (streamId < 0) {
456             promise.setFailure(new Http2NoMoreStreamIdsException());
457 
458             // Simulate a GOAWAY being received due to stream exhaustion on this connection. We use the maximum
459             // valid stream ID for the current peer.
460             onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(connection.isServer() ? Integer.MAX_VALUE :
461                     Integer.MAX_VALUE - 1, NO_ERROR.code(),
462                     writeAscii(ctx.alloc(), "Stream IDs exhausted on local stream creation")));
463 
464             return false;
465         }
466         http2FrameStream.id = streamId;
467 
468         // Use a Map to store all pending streams as we may have multiple. This is needed as if we would store the
469         // stream in a field directly we may override the stored field before onStreamAdded(...) was called
470         // and so not correctly set the property for the buffered stream.
471         //
472         // See https://github.com/netty/netty/issues/8692
473         Object old = frameStreamToInitializeMap.put(streamId, http2FrameStream);
474 
475         // We should not re-use ids.
476         assert old == null;
477         return true;
478     }
479 
480     private void handleHeaderFuture(ChannelFuture channelFuture, int streamId) {
481         if (!channelFuture.isSuccess()) {
482             frameStreamToInitializeMap.remove(streamId);
483         }
484     }
485 
486     private void onStreamActive0(Http2Stream stream) {
487         if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID &&
488                 connection().local().isValidStreamId(stream.id())) {
489             return;
490         }
491 
492         DefaultHttp2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
493         onHttp2StreamStateChanged(ctx, stream2);
494     }
495 
496     private final class ConnectionListener extends Http2ConnectionAdapter {
497         @Override
498         public void onStreamAdded(Http2Stream stream) {
499             DefaultHttp2FrameStream frameStream = frameStreamToInitializeMap.remove(stream.id());
500 
501             if (frameStream != null) {
502                 frameStream.setStreamAndProperty(streamKey, stream);
503             }
504         }
505 
506         @Override
507         public void onStreamActive(Http2Stream stream) {
508             onStreamActive0(stream);
509         }
510 
511         @Override
512         public void onStreamClosed(Http2Stream stream) {
513             onHttp2StreamStateChanged0(stream);
514         }
515 
516         @Override
517         public void onStreamHalfClosed(Http2Stream stream) {
518             onHttp2StreamStateChanged0(stream);
519         }
520 
521         private void onHttp2StreamStateChanged0(Http2Stream stream) {
522             DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
523             if (stream2 != null) {
524                 onHttp2StreamStateChanged(ctx, stream2);
525             }
526         }
527     }
528 
529     @Override
530     protected void onConnectionError(
531             ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex) {
532         if (!outbound) {
533             // allow the user to handle it first in the pipeline, and then automatically clean up.
534             // If this is not desired behavior the user can override this method.
535             //
536             // We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
537             ctx.fireExceptionCaught(cause);
538         }
539         super.onConnectionError(ctx, outbound, cause, http2Ex);
540     }
541 
542     /**
543      * Exceptions for unknown streams, that is streams that have no {@link Http2FrameStream} object attached
544      * are simply logged and replied to by sending a RST_STREAM frame.
545      */
546     @Override
547     protected final void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
548                                        Http2Exception.StreamException streamException) {
549         int streamId = streamException.streamId();
550         Http2Stream connectionStream = connection().stream(streamId);
551         if (connectionStream == null) {
552             onHttp2UnknownStreamError(ctx, cause, streamException);
553             // Write a RST_STREAM
554             super.onStreamError(ctx, outbound, cause, streamException);
555             return;
556         }
557 
558         Http2FrameStream stream = connectionStream.getProperty(streamKey);
559         if (stream == null) {
560             LOG.warn("Stream exception thrown without stream object attached.", cause);
561             // Write a RST_STREAM
562             super.onStreamError(ctx, outbound, cause, streamException);
563             return;
564         }
565 
566         if (!outbound) {
567             // We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
568             onHttp2FrameStreamException(ctx, new Http2FrameStreamException(stream, streamException.error(), cause));
569         }
570     }
571 
572     private static void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx,
573             Throwable cause, Http2Exception.StreamException streamException) {
574         // We log here for debugging purposes. This exception will be propagated to the upper layers through other ways:
575         // - fireExceptionCaught
576         // - fireUserEventTriggered(Http2ResetFrame), see Http2MultiplexHandler#channelRead(...)
577         // - by failing write promise
578         // Receiver of the error is responsible for correct handling of this exception.
579         LOG.log(DEBUG, "Stream exception thrown for unknown stream {}.", streamException.streamId(), cause);
580     }
581 
582     @Override
583     protected final boolean isGracefulShutdownComplete() {
584         return super.isGracefulShutdownComplete() && numBufferedStreams == 0;
585     }
586 
587     private final class FrameListener implements Http2FrameListener {
588 
589         @Override
590         public void onUnknownFrame(
591                 ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {
592             if (streamId == 0) {
593                 // Ignore unknown frames on connection stream, for example: HTTP/2 GREASE testing
594                 return;
595             }
596             onHttp2Frame(ctx, new DefaultHttp2UnknownFrame(frameType, flags, payload)
597                     .stream(requireStream(streamId)).retain());
598         }
599 
600         @Override
601         public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
602             onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings));
603         }
604 
605         @Override
606         public void onPingRead(ChannelHandlerContext ctx, long data) {
607             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false));
608         }
609 
610         @Override
611         public void onPingAckRead(ChannelHandlerContext ctx, long data) {
612             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true));
613         }
614 
615         @Override
616         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
617             onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(requireStream(streamId)));
618         }
619 
620         @Override
621         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
622             if (streamId == 0) {
623                 // Ignore connection window updates.
624                 return;
625             }
626             onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(requireStream(streamId)));
627         }
628 
629         @Override
630         public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
631                                   Http2Headers headers, int streamDependency, short weight, boolean
632                                           exclusive, int padding, boolean endStream) {
633             onHeadersRead(ctx, streamId, headers, padding, endStream);
634         }
635 
636         @Override
637         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
638                                   int padding, boolean endOfStream) {
639             onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding)
640                     .stream(requireStream(streamId)));
641         }
642 
643         @Override
644         public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
645                               boolean endOfStream) {
646             onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding)
647                     .stream(requireStream(streamId)).retain());
648             // We return the bytes in consumeBytes() once the stream channel consumed the bytes.
649             return 0;
650         }
651 
652         @Override
653         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) {
654             onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData).retain());
655         }
656 
657         @Override
658         public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
659                                    short weight, boolean exclusive) {
660 
661             Http2Stream stream = connection().stream(streamId);
662             if (stream == null) {
663                 // The stream was not opened yet, let's just ignore this for now.
664                 return;
665             }
666             onHttp2Frame(ctx, new DefaultHttp2PriorityFrame(streamDependency, weight, exclusive)
667                     .stream(requireStream(streamId)));
668         }
669 
670         @Override
671         public void onSettingsAckRead(ChannelHandlerContext ctx) {
672             onHttp2Frame(ctx, Http2SettingsAckFrame.INSTANCE);
673         }
674 
675         @Override
676         public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
677                                       Http2Headers headers, int padding) {
678             onHttp2Frame(ctx, new DefaultHttp2PushPromiseFrame(headers, padding, promisedStreamId)
679                     .pushStream(new DefaultHttp2FrameStream()
680                             .setStreamAndProperty(streamKey, connection().stream(promisedStreamId)))
681                     .stream(requireStream(streamId)));
682         }
683 
684         private Http2FrameStream requireStream(int streamId) {
685             Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey);
686             if (stream == null) {
687                 throw new IllegalStateException("Stream object required for identifier: " + streamId);
688             }
689             return stream;
690         }
691     }
692 
693     private void onUpgradeEvent(ChannelHandlerContext ctx, UpgradeEvent evt) {
694         ctx.fireUserEventTriggered(evt);
695     }
696 
697     private void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream,
698                                                  @SuppressWarnings("unused") boolean writable) {
699         ctx.fireUserEventTriggered(stream.writabilityChanged);
700     }
701 
702     void onHttp2StreamStateChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
703         ctx.fireUserEventTriggered(stream.stateChanged);
704     }
705 
706     void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
707         ctx.fireChannelRead(frame);
708     }
709 
710     void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
711         ctx.fireExceptionCaught(cause);
712     }
713 
714     private final class Http2RemoteFlowControllerListener implements Http2RemoteFlowController.Listener {
715         @Override
716         public void writabilityChanged(Http2Stream stream) {
717             DefaultHttp2FrameStream frameStream = stream.getProperty(streamKey);
718             if (frameStream == null) {
719                 return;
720             }
721             onHttp2StreamWritabilityChanged(
722                     ctx, frameStream, connection().remote().flowController().isWritable(stream));
723         }
724     }
725 
726     /**
727      * {@link Http2FrameStream} implementation.
728      */
729     // TODO(buchgr): Merge Http2FrameStream and Http2Stream.
730     static class DefaultHttp2FrameStream implements Http2FrameStream {
731 
732         private volatile int id = -1;
733         private volatile Http2Stream stream;
734 
735         final Http2FrameStreamEvent stateChanged = Http2FrameStreamEvent.stateChanged(this);
736         final Http2FrameStreamEvent writabilityChanged = Http2FrameStreamEvent.writabilityChanged(this);
737 
738         Channel attachment;
739 
740         DefaultHttp2FrameStream setStreamAndProperty(PropertyKey streamKey, Http2Stream stream) {
741             assert id == -1 || stream.id() == id;
742             this.stream = stream;
743             stream.setProperty(streamKey, this);
744             return this;
745         }
746 
747         @Override
748         public int id() {
749             Http2Stream stream = this.stream;
750             return stream == null ? id : stream.id();
751         }
752 
753         @Override
754         public State state() {
755             Http2Stream stream = this.stream;
756             return stream == null ? State.IDLE : stream.state();
757         }
758 
759         @Override
760         public String toString() {
761             return String.valueOf(id());
762         }
763     }
764 }