View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandler;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.handler.codec.UnsupportedMessageTypeException;
26  import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
27  import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
28  import io.netty.handler.codec.http2.Http2Stream.State;
29  import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
30  import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
31  import io.netty.util.ReferenceCountUtil;
32  import io.netty.util.ReferenceCounted;
33  import io.netty.util.collection.IntObjectHashMap;
34  import io.netty.util.collection.IntObjectMap;
35  import io.netty.util.internal.logging.InternalLogger;
36  import io.netty.util.internal.logging.InternalLoggerFactory;
37  
38  import static io.netty.buffer.ByteBufUtil.writeAscii;
39  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
40  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
41  import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
42  import static io.netty.util.internal.logging.InternalLogLevel.DEBUG;
43  
44  /**
45   * <p><em>This API is very immature.</em> The Http2Connection-based API is currently preferred over this API.
46   * This API is targeted to eventually replace or reduce the need for the {@link Http2ConnectionHandler} API.
47   *
48   * <p>An HTTP/2 handler that maps HTTP/2 frames to {@link Http2Frame} objects and vice versa. For every incoming HTTP/2
49   * frame, an {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outbound {@link Http2Frame}
50   * objects received via {@link #write} are converted to the HTTP/2 wire format. HTTP/2 frames specific to a stream
51   * implement the {@link Http2StreamFrame} interface. The {@link Http2FrameCodec} is instantiated using the
52   * {@link Http2FrameCodecBuilder}. It's recommended for channel handlers to inherit from the
53   * {@link Http2ChannelDuplexHandler}, as it provides additional functionality like iterating over all active streams or
54   * creating outbound streams.
55   *
56   * <h3>Stream Lifecycle</h3>
57   * <p>
58   * The frame codec delivers and writes frames for active streams. An active stream is closed when either side sends a
59   * {@code RST_STREAM} frame or both sides send a frame with the {@code END_STREAM} flag set. Each
60   * {@link Http2StreamFrame} has a {@link Http2FrameStream} object attached that uniquely identifies a particular stream.
61   *
62   * <p>{@link Http2StreamFrame}s read from the channel always a {@link Http2FrameStream} object set, while when writing a
63   * {@link Http2StreamFrame} the application code needs to set a {@link Http2FrameStream} object using
64   * {@link Http2StreamFrame#stream(Http2FrameStream)}.
65   *
66   * <h3>Flow control</h3>
67   * <p>
68   * The frame codec automatically increments stream and connection flow control windows.
69   *
70   * <p>Incoming flow controlled frames need to be consumed by writing a {@link Http2WindowUpdateFrame} with the consumed
71   * number of bytes and the corresponding stream identifier set to the frame codec.
72   *
73   * <p>The local stream-level flow control window can be changed by writing a {@link Http2SettingsFrame} with the
74   * {@link Http2Settings#initialWindowSize()} set to the targeted value.
75   *
76   * <p>The connection-level flow control window can be changed by writing a {@link Http2WindowUpdateFrame} with the
77   * desired window size <em>increment</em> in bytes and the stream identifier set to {@code 0}. By default the initial
78   * connection-level flow control window is the same as initial stream-level flow control window.
79   *
80   * <h3>New inbound Streams</h3>
81   * <p>
82   * The first frame of an HTTP/2 stream must be an {@link Http2HeadersFrame}, which will have an {@link Http2FrameStream}
83   * object attached.
84   *
85   * <h3>New outbound Streams</h3>
86   * <p>
87   * A outbound HTTP/2 stream can be created by first instantiating a new {@link Http2FrameStream} object via
88   * {@link Http2ChannelDuplexHandler#newStream()}, and then writing a {@link Http2HeadersFrame} object with the stream
89   * attached.
90   *
91   * <pre> {@code
92   *     final Http2Stream2 stream = handler.newStream();
93   *     ctx.write(headersFrame.stream(stream)).addListener(new ChannelFutureListener() {
94   *
95   *         @Override
96   *         public void operationComplete(ChannelFuture f) {
97   *             if (f.isSuccess()) {
98   *                 // Stream is active and stream.id() returns a valid stream identifier.
99   *                 System.out.println("New stream with id " + stream.id() + " created.");
100  *             } else {
101  *                 // Stream failed to become active. Handle error.
102  *                 if (f.cause() instanceof Http2NoMoreStreamIdsException) {
103  *
104  *                 } else if (f.cause() instanceof Http2GoAwayException) {
105  *
106  *                 } else {
107  *
108  *                 }
109  *             }
110  *         }
111  *     }
112  *     }
113  * </pre>
114  *
115  * <p>If a new stream cannot be created due to stream id exhaustion of the endpoint, the {@link ChannelPromise} of the
116  * HEADERS frame will fail with a {@link Http2NoMoreStreamIdsException}.
117  *
118  * <p>The HTTP/2 standard allows for an endpoint to limit the maximum number of concurrently active streams via the
119  * {@code SETTINGS_MAX_CONCURRENT_STREAMS} setting. When this limit is reached, no new streams can be created. However,
120  * the {@link Http2FrameCodec} can be build with
121  * {@link Http2FrameCodecBuilder#encoderEnforceMaxConcurrentStreams(boolean)} enabled, in which case a new stream and
122  * its associated frames will be buffered until either the limit is increased or an active stream is closed. It's,
123  * however, possible that a buffered stream will never become active. That is, the channel might
124  * get closed or a GO_AWAY frame might be received. In the first case, all writes of buffered streams will fail with a
125  * {@link Http2ChannelClosedException}. In the second case, all writes of buffered streams with an identifier less than
126  * the last stream identifier of the GO_AWAY frame will fail with a {@link Http2GoAwayException}.
127  *
128  * <h3>Error Handling</h3>
129  * <p>
130  * Exceptions and errors are propagated via {@link ChannelInboundHandler#exceptionCaught}. Exceptions that apply to
131  * a specific HTTP/2 stream are wrapped in a {@link Http2FrameStreamException} and have the corresponding
132  * {@link Http2FrameStream} object attached.
133  *
134  * <h3>Reference Counting</h3>
135  * <p>
136  * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
137  * reference counted objects (e.g. {@link ByteBuf}s). The frame codec will call {@link ReferenceCounted#retain()} before
138  * propagating a reference counted object through the pipeline, and thus an application handler needs to release such
139  * an object after having consumed it. For more information on reference counting take a look at
140  * https://netty.io/wiki/reference-counted-objects.html
141  *
142  * <h3>HTTP Upgrade</h3>
143  * <p>
144  * Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link Http2ServerUpgradeCodec}; the necessary
145  * HTTP-to-HTTP/2 conversion is performed automatically.
146  */
147 public class Http2FrameCodec extends Http2ConnectionHandler {
148 
149     private static final InternalLogger LOG = InternalLoggerFactory.getInstance(Http2FrameCodec.class);
150 
151     private static final Class<?>[] SUPPORTED_MESSAGES = new Class[] {
152             Http2DataFrame.class, Http2HeadersFrame.class, Http2WindowUpdateFrame.class, Http2ResetFrame.class,
153             Http2PingFrame.class, Http2SettingsFrame.class, Http2SettingsAckFrame.class, Http2GoAwayFrame.class,
154             Http2PushPromiseFrame.class, Http2PriorityFrame.class, Http2UnknownFrame.class };
155 
156     protected final PropertyKey streamKey;
157     private final PropertyKey upgradeKey;
158 
159     private final Integer initialFlowControlWindowSize;
160 
161     ChannelHandlerContext ctx;
162 
163     /**
164      * Number of buffered streams if the {@link StreamBufferingEncoder} is used.
165      **/
166     private int numBufferedStreams;
167     private final IntObjectMap<DefaultHttp2FrameStream> frameStreamToInitializeMap =
168             new IntObjectHashMap<DefaultHttp2FrameStream>(8);
169 
170     Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings,
171                     boolean decoupleCloseAndGoAway, boolean flushPreface) {
172         super(decoder, encoder, initialSettings, decoupleCloseAndGoAway, flushPreface);
173 
174         decoder.frameListener(new FrameListener());
175         connection().addListener(new ConnectionListener());
176         connection().remote().flowController().listener(new Http2RemoteFlowControllerListener());
177         streamKey = connection().newKey();
178         upgradeKey = connection().newKey();
179         initialFlowControlWindowSize = initialSettings.initialWindowSize();
180     }
181 
182     /**
183      * Creates a new outbound/local stream.
184      */
185     DefaultHttp2FrameStream newStream() {
186         return new DefaultHttp2FrameStream();
187     }
188 
189     /**
190      * Iterates over all active HTTP/2 streams.
191      *
192      * <p>This method must not be called outside of the event loop.
193      */
194     final void forEachActiveStream(final Http2FrameStreamVisitor streamVisitor) throws Http2Exception {
195         assert ctx.executor().inEventLoop();
196         if (connection().numActiveStreams() > 0) {
197             connection().forEachActiveStream(new Http2StreamVisitor() {
198                 @Override
199                 public boolean visit(Http2Stream stream) {
200                     try {
201                         return streamVisitor.visit((Http2FrameStream) stream.getProperty(streamKey));
202                     } catch (Throwable cause) {
203                         onError(ctx, false, cause);
204                         return false;
205                     }
206                 }
207             });
208         }
209     }
210 
211     /**
212      * Retrieve the number of streams currently in the process of being initialized.
213      * <p>
214      * This is package-private for testing only.
215      */
216     int numInitializingStreams() {
217         return frameStreamToInitializeMap.size();
218     }
219 
220     @Override
221     public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
222         this.ctx = ctx;
223         super.handlerAdded(ctx);
224         handlerAdded0(ctx);
225         // Must be after Http2ConnectionHandler does its initialization in handlerAdded above.
226         // The server will not send a connection preface so we are good to send a window update.
227         Http2Connection connection = connection();
228         if (connection.isServer()) {
229             tryExpandConnectionFlowControlWindow(connection);
230         }
231     }
232 
233     private void tryExpandConnectionFlowControlWindow(Http2Connection connection) throws Http2Exception {
234         if (initialFlowControlWindowSize != null) {
235             // The window size in the settings explicitly excludes the connection window. So we manually manipulate the
236             // connection window to accommodate more concurrent data per connection.
237             Http2Stream connectionStream = connection.connectionStream();
238             Http2LocalFlowController localFlowController = connection.local().flowController();
239             final int delta = initialFlowControlWindowSize - localFlowController.initialWindowSize(connectionStream);
240             // Only increase the connection window, don't decrease it.
241             if (delta > 0) {
242                 // Double the delta just so a single stream can't exhaust the connection window.
243                 localFlowController.incrementWindowSize(connectionStream, Math.max(delta << 1, delta));
244                 flush(ctx);
245             }
246         }
247     }
248 
249     void handlerAdded0(@SuppressWarnings("unsed") ChannelHandlerContext ctx) throws Exception {
250         // sub-class can override this for extra steps that needs to be done when the handler is added.
251     }
252 
253     /**
254      * Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via
255      * HTTP/2 on stream 1 (the stream specifically reserved for cleartext HTTP upgrade).
256      */
257     @Override
258     public final void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
259         if (evt == Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE) {
260             // The user event implies that we are on the client.
261             tryExpandConnectionFlowControlWindow(connection());
262 
263             // We schedule this on the EventExecutor to allow to have any extra handlers added to the pipeline
264             // before we pass the event to the next handler. This is needed as the event may be called from within
265             // handlerAdded(...) which will be run before other handlers will be added to the pipeline.
266             ctx.executor().execute(new Runnable() {
267                 @Override
268                 public void run() {
269                     ctx.fireUserEventTriggered(evt);
270                 }
271             });
272         } else if (evt instanceof UpgradeEvent) {
273             UpgradeEvent upgrade = (UpgradeEvent) evt;
274             try {
275                 onUpgradeEvent(ctx, upgrade.retain());
276                 Http2Stream stream = connection().stream(HTTP_UPGRADE_STREAM_ID);
277                 if (stream.getProperty(streamKey) == null) {
278                     // TODO: improve handler/stream lifecycle so that stream isn't active before handler added.
279                     // The stream was already made active, but ctx may have been null so it wasn't initialized.
280                     // https://github.com/netty/netty/issues/4942
281                     onStreamActive0(stream);
282                 }
283                 upgrade.upgradeRequest().headers().setInt(
284                         HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), HTTP_UPGRADE_STREAM_ID);
285                 stream.setProperty(upgradeKey, true);
286                 InboundHttpToHttp2Adapter.handle(
287                         ctx, connection(), decoder().frameListener(), upgrade.upgradeRequest().retain());
288             } finally {
289                 upgrade.release();
290             }
291         } else {
292             onUserEventTriggered(ctx, evt);
293             ctx.fireUserEventTriggered(evt);
294         }
295     }
296 
297     void onUserEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
298         // noop
299     }
300 
301     /**
302      * Processes all {@link Http2Frame}s. {@link Http2StreamFrame}s may only originate in child
303      * streams.
304      */
305     @Override
306     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
307         if (msg instanceof Http2DataFrame) {
308             Http2DataFrame dataFrame = (Http2DataFrame) msg;
309             encoder().writeData(ctx, dataFrame.stream().id(), dataFrame.content(),
310                     dataFrame.padding(), dataFrame.isEndStream(), promise);
311         } else if (msg instanceof Http2HeadersFrame) {
312             writeHeadersFrame(ctx, (Http2HeadersFrame) msg, promise);
313         } else if (msg instanceof Http2WindowUpdateFrame) {
314             Http2WindowUpdateFrame frame = (Http2WindowUpdateFrame) msg;
315             Http2FrameStream frameStream = frame.stream();
316             // It is legit to send a WINDOW_UPDATE frame for the connection stream. The parent channel doesn't attempt
317             // to set the Http2FrameStream so we assume if it is null the WINDOW_UPDATE is for the connection stream.
318             try {
319                 if (frameStream == null) {
320                     increaseInitialConnectionWindow(frame.windowSizeIncrement());
321                 } else {
322                     consumeBytes(frameStream.id(), frame.windowSizeIncrement());
323                 }
324                 promise.setSuccess();
325             } catch (Throwable t) {
326                 promise.setFailure(t);
327             }
328         } else if (msg instanceof Http2ResetFrame) {
329             Http2ResetFrame rstFrame = (Http2ResetFrame) msg;
330             int id = rstFrame.stream().id();
331             // Only ever send a reset frame if stream may have existed before as otherwise we may send a RST on a
332             // stream in an invalid state and cause a connection error.
333             if (connection().streamMayHaveExisted(id)) {
334                 encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode(), promise);
335             } else {
336                 ReferenceCountUtil.release(rstFrame);
337                 promise.setFailure(Http2Exception.streamError(
338                         rstFrame.stream().id(), Http2Error.PROTOCOL_ERROR, "Stream never existed"));
339             }
340         } else if (msg instanceof Http2PingFrame) {
341             Http2PingFrame frame = (Http2PingFrame) msg;
342             encoder().writePing(ctx, frame.ack(), frame.content(), promise);
343         } else if (msg instanceof Http2SettingsFrame) {
344             encoder().writeSettings(ctx, ((Http2SettingsFrame) msg).settings(), promise);
345         } else if (msg instanceof Http2SettingsAckFrame) {
346             // In the event of manual SETTINGS ACK, it is assumed the encoder will apply the earliest received but not
347             // yet ACKed settings.
348             encoder().writeSettingsAck(ctx, promise);
349         } else if (msg instanceof Http2GoAwayFrame) {
350             writeGoAwayFrame(ctx, (Http2GoAwayFrame) msg, promise);
351         } else if (msg instanceof Http2PushPromiseFrame) {
352             Http2PushPromiseFrame pushPromiseFrame = (Http2PushPromiseFrame) msg;
353             writePushPromise(ctx, pushPromiseFrame, promise);
354         } else if (msg instanceof Http2PriorityFrame) {
355             Http2PriorityFrame priorityFrame = (Http2PriorityFrame) msg;
356             encoder().writePriority(ctx, priorityFrame.stream().id(), priorityFrame.streamDependency(),
357                     priorityFrame.weight(), priorityFrame.exclusive(), promise);
358         } else if (msg instanceof Http2UnknownFrame) {
359             Http2UnknownFrame unknownFrame = (Http2UnknownFrame) msg;
360             encoder().writeFrame(ctx, unknownFrame.frameType(), unknownFrame.stream().id(),
361                     unknownFrame.flags(), unknownFrame.content(), promise);
362         } else if (!(msg instanceof Http2Frame)) {
363             ctx.write(msg, promise);
364         } else {
365             ReferenceCountUtil.release(msg);
366             throw new UnsupportedMessageTypeException(msg, SUPPORTED_MESSAGES);
367         }
368     }
369 
370     private void increaseInitialConnectionWindow(int deltaBytes) throws Http2Exception {
371         // The LocalFlowController is responsible for detecting over/under flow.
372         connection().local().flowController().incrementWindowSize(connection().connectionStream(), deltaBytes);
373     }
374 
375     final boolean consumeBytes(int streamId, int bytes) throws Http2Exception {
376         Http2Stream stream = connection().stream(streamId);
377         // Upgraded requests are ineligible for stream control. We add the null check
378         // in case the stream has been deregistered.
379         if (stream != null && streamId == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
380             Boolean upgraded = stream.getProperty(upgradeKey);
381             if (Boolean.TRUE.equals(upgraded)) {
382                 return false;
383             }
384         }
385 
386         return connection().local().flowController().consumeBytes(stream, bytes);
387     }
388 
389     private void writeGoAwayFrame(ChannelHandlerContext ctx, Http2GoAwayFrame frame, ChannelPromise promise) {
390         if (frame.lastStreamId() > -1) {
391             frame.release();
392             throw new IllegalArgumentException("Last stream id must not be set on GOAWAY frame");
393         }
394 
395         int lastStreamCreated = connection().remote().lastStreamCreated();
396         long lastStreamId = lastStreamCreated + ((long) frame.extraStreamIds()) * 2;
397         // Check if the computation overflowed.
398         if (lastStreamId > Integer.MAX_VALUE) {
399             lastStreamId = Integer.MAX_VALUE;
400         }
401         goAway(ctx, (int) lastStreamId, frame.errorCode(), frame.content(), promise);
402     }
403 
404     private void writeHeadersFrame(final ChannelHandlerContext ctx, Http2HeadersFrame headersFrame,
405                                    ChannelPromise promise) {
406 
407         if (isStreamIdValid(headersFrame.stream().id())) {
408             encoder().writeHeaders(ctx, headersFrame.stream().id(), headersFrame.headers(), headersFrame.padding(),
409                     headersFrame.isEndStream(), promise);
410         } else if (initializeNewStream(ctx, (DefaultHttp2FrameStream) headersFrame.stream(), promise)) {
411             promise = promise.unvoid();
412 
413             final int streamId = headersFrame.stream().id();
414 
415             encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
416                     headersFrame.isEndStream(), promise);
417 
418             if (!promise.isDone()) {
419                 numBufferedStreams++;
420                 // Clean up the stream being initialized if writing the headers fails and also
421                 // decrement the number of buffered streams.
422                 promise.addListener(new ChannelFutureListener() {
423                     @Override
424                     public void operationComplete(ChannelFuture channelFuture) {
425                         numBufferedStreams--;
426                         handleHeaderFuture(channelFuture, streamId);
427                     }
428                 });
429             } else {
430                 handleHeaderFuture(promise, streamId);
431             }
432         }
433     }
434 
435     private void writePushPromise(final ChannelHandlerContext ctx, Http2PushPromiseFrame pushPromiseFrame,
436                                   final ChannelPromise promise) {
437         if (isStreamIdValid(pushPromiseFrame.pushStream().id())) {
438             encoder().writePushPromise(ctx, pushPromiseFrame.stream().id(), pushPromiseFrame.pushStream().id(),
439                     pushPromiseFrame.http2Headers(), pushPromiseFrame.padding(), promise);
440         } else if (initializeNewStream(ctx, (DefaultHttp2FrameStream) pushPromiseFrame.pushStream(), promise)) {
441             final int streamId = pushPromiseFrame.stream().id();
442             encoder().writePushPromise(ctx, streamId, pushPromiseFrame.pushStream().id(),
443                     pushPromiseFrame.http2Headers(), pushPromiseFrame.padding(), promise);
444 
445             if (promise.isDone()) {
446                 handleHeaderFuture(promise, streamId);
447             } else {
448                 numBufferedStreams++;
449                 // Clean up the stream being initialized if writing the headers fails and also
450                 // decrement the number of buffered streams.
451                 promise.addListener(new ChannelFutureListener() {
452                     @Override
453                     public void operationComplete(ChannelFuture channelFuture) {
454                         numBufferedStreams--;
455                         handleHeaderFuture(channelFuture, streamId);
456                     }
457                 });
458             }
459         }
460     }
461 
462     private boolean initializeNewStream(ChannelHandlerContext ctx, DefaultHttp2FrameStream http2FrameStream,
463                                         ChannelPromise promise) {
464         final Http2Connection connection = connection();
465         final int streamId = connection.local().incrementAndGetNextStreamId();
466         if (streamId < 0) {
467             promise.setFailure(new Http2NoMoreStreamIdsException());
468 
469             // Simulate a GOAWAY being received due to stream exhaustion on this connection. We use the maximum
470             // valid stream ID for the current peer.
471             onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(connection.isServer() ? Integer.MAX_VALUE :
472                     Integer.MAX_VALUE - 1, NO_ERROR.code(),
473                     writeAscii(ctx.alloc(), "Stream IDs exhausted on local stream creation")));
474 
475             return false;
476         }
477         http2FrameStream.id = streamId;
478 
479         // Use a Map to store all pending streams as we may have multiple. This is needed as if we would store the
480         // stream in a field directly we may override the stored field before onStreamAdded(...) was called
481         // and so not correctly set the property for the buffered stream.
482         //
483         // See https://github.com/netty/netty/issues/8692
484         Object old = frameStreamToInitializeMap.put(streamId, http2FrameStream);
485 
486         // We should not re-use ids.
487         assert old == null;
488         return true;
489     }
490 
491     private void handleHeaderFuture(ChannelFuture channelFuture, int streamId) {
492         if (!channelFuture.isSuccess()) {
493             frameStreamToInitializeMap.remove(streamId);
494         }
495     }
496 
497     private void onStreamActive0(Http2Stream stream) {
498         if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID &&
499                 connection().local().isValidStreamId(stream.id())) {
500             return;
501         }
502 
503         DefaultHttp2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
504         onHttp2StreamStateChanged(ctx, stream2);
505     }
506 
507     private final class ConnectionListener extends Http2ConnectionAdapter {
508         @Override
509         public void onStreamAdded(Http2Stream stream) {
510             DefaultHttp2FrameStream frameStream = frameStreamToInitializeMap.remove(stream.id());
511 
512             if (frameStream != null) {
513                 frameStream.setStreamAndProperty(streamKey, stream);
514             }
515         }
516 
517         @Override
518         public void onStreamActive(Http2Stream stream) {
519             onStreamActive0(stream);
520         }
521 
522         @Override
523         public void onStreamClosed(Http2Stream stream) {
524             onHttp2StreamStateChanged0(stream);
525         }
526 
527         @Override
528         public void onStreamHalfClosed(Http2Stream stream) {
529             onHttp2StreamStateChanged0(stream);
530         }
531 
532         private void onHttp2StreamStateChanged0(Http2Stream stream) {
533             DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
534             if (stream2 != null) {
535                 onHttp2StreamStateChanged(ctx, stream2);
536             }
537         }
538     }
539 
540     @Override
541     protected void onConnectionError(
542             ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex) {
543         if (!outbound) {
544             // allow the user to handle it first in the pipeline, and then automatically clean up.
545             // If this is not desired behavior the user can override this method.
546             //
547             // We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
548             ctx.fireExceptionCaught(cause);
549         }
550         super.onConnectionError(ctx, outbound, cause, http2Ex);
551     }
552 
553     /**
554      * Exceptions for unknown streams, that is streams that have no {@link Http2FrameStream} object attached
555      * are simply logged and replied to by sending a RST_STREAM frame.
556      */
557     @Override
558     protected final void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
559                                        Http2Exception.StreamException streamException) {
560         int streamId = streamException.streamId();
561         Http2Stream connectionStream = connection().stream(streamId);
562         if (connectionStream == null) {
563             onHttp2UnknownStreamError(ctx, cause, streamException);
564             // Write a RST_STREAM
565             super.onStreamError(ctx, outbound, cause, streamException);
566             return;
567         }
568 
569         Http2FrameStream stream = connectionStream.getProperty(streamKey);
570         if (stream == null) {
571             LOG.warn("Stream exception thrown without stream object attached.", cause);
572             // Write a RST_STREAM
573             super.onStreamError(ctx, outbound, cause, streamException);
574             return;
575         }
576 
577         if (!outbound) {
578             // We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
579             onHttp2FrameStreamException(ctx, new Http2FrameStreamException(stream, streamException.error(), cause));
580         }
581     }
582 
583     private static void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx,
584             Throwable cause, Http2Exception.StreamException streamException) {
585         // We log here for debugging purposes. This exception will be propagated to the upper layers through other ways:
586         // - fireExceptionCaught
587         // - fireUserEventTriggered(Http2ResetFrame), see Http2MultiplexHandler#channelRead(...)
588         // - by failing write promise
589         // Receiver of the error is responsible for correct handling of this exception.
590         LOG.log(DEBUG, "Stream exception thrown for unknown stream {}.", streamException.streamId(), cause);
591     }
592 
593     @Override
594     protected final boolean isGracefulShutdownComplete() {
595         return super.isGracefulShutdownComplete() && numBufferedStreams == 0;
596     }
597 
598     private final class FrameListener implements Http2FrameListener {
599 
600         @Override
601         public void onUnknownFrame(
602                 ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {
603             if (streamId == 0) {
604                 // Ignore unknown frames on connection stream, for example: HTTP/2 GREASE testing
605                 return;
606             }
607             onHttp2Frame(ctx, new DefaultHttp2UnknownFrame(frameType, flags, payload)
608                     .stream(requireStream(streamId)).retain());
609         }
610 
611         @Override
612         public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
613             onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings));
614         }
615 
616         @Override
617         public void onPingRead(ChannelHandlerContext ctx, long data) {
618             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false));
619         }
620 
621         @Override
622         public void onPingAckRead(ChannelHandlerContext ctx, long data) {
623             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true));
624         }
625 
626         @Override
627         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
628             onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(requireStream(streamId)));
629         }
630 
631         @Override
632         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
633             if (streamId == 0) {
634                 // Ignore connection window updates.
635                 return;
636             }
637             onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(requireStream(streamId)));
638         }
639 
640         @Override
641         public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
642                                   Http2Headers headers, int streamDependency, short weight, boolean
643                                           exclusive, int padding, boolean endStream) {
644             onHeadersRead(ctx, streamId, headers, padding, endStream);
645         }
646 
647         @Override
648         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
649                                   int padding, boolean endOfStream) {
650             onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding)
651                     .stream(requireStream(streamId)));
652         }
653 
654         @Override
655         public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
656                               boolean endOfStream) {
657             onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding)
658                     .stream(requireStream(streamId)).retain());
659             // We return the bytes in consumeBytes() once the stream channel consumed the bytes.
660             return 0;
661         }
662 
663         @Override
664         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) {
665             onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData).retain());
666         }
667 
668         @Override
669         public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
670                                    short weight, boolean exclusive) {
671 
672             Http2Stream stream = connection().stream(streamId);
673             if (stream == null) {
674                 // The stream was not opened yet, let's just ignore this for now.
675                 return;
676             }
677             onHttp2Frame(ctx, new DefaultHttp2PriorityFrame(streamDependency, weight, exclusive)
678                     .stream(requireStream(streamId)));
679         }
680 
681         @Override
682         public void onSettingsAckRead(ChannelHandlerContext ctx) {
683             onHttp2Frame(ctx, Http2SettingsAckFrame.INSTANCE);
684         }
685 
686         @Override
687         public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
688                                       Http2Headers headers, int padding) {
689             onHttp2Frame(ctx, new DefaultHttp2PushPromiseFrame(headers, padding, promisedStreamId)
690                     .pushStream(new DefaultHttp2FrameStream()
691                             .setStreamAndProperty(streamKey, connection().stream(promisedStreamId)))
692                     .stream(requireStream(streamId)));
693         }
694 
695         private Http2FrameStream requireStream(int streamId) {
696             Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey);
697             if (stream == null) {
698                 throw new IllegalStateException("Stream object required for identifier: " + streamId);
699             }
700             return stream;
701         }
702     }
703 
704     private void onUpgradeEvent(ChannelHandlerContext ctx, UpgradeEvent evt) {
705         ctx.fireUserEventTriggered(evt);
706     }
707 
708     private void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream,
709                                                  @SuppressWarnings("unused") boolean writable) {
710         ctx.fireUserEventTriggered(stream.writabilityChanged);
711     }
712 
713     void onHttp2StreamStateChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
714         ctx.fireUserEventTriggered(stream.stateChanged);
715     }
716 
717     void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
718         ctx.fireChannelRead(frame);
719     }
720 
721     void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
722         ctx.fireExceptionCaught(cause);
723     }
724 
725     private final class Http2RemoteFlowControllerListener implements Http2RemoteFlowController.Listener {
726         @Override
727         public void writabilityChanged(Http2Stream stream) {
728             DefaultHttp2FrameStream frameStream = stream.getProperty(streamKey);
729             if (frameStream == null) {
730                 return;
731             }
732             onHttp2StreamWritabilityChanged(
733                     ctx, frameStream, connection().remote().flowController().isWritable(stream));
734         }
735     }
736 
737     /**
738      * {@link Http2FrameStream} implementation.
739      */
740     // TODO(buchgr): Merge Http2FrameStream and Http2Stream.
741     static class DefaultHttp2FrameStream implements Http2FrameStream {
742 
743         private volatile int id = -1;
744         private volatile Http2Stream stream;
745 
746         final Http2FrameStreamEvent stateChanged = Http2FrameStreamEvent.stateChanged(this);
747         final Http2FrameStreamEvent writabilityChanged = Http2FrameStreamEvent.writabilityChanged(this);
748 
749         Channel attachment;
750 
751         DefaultHttp2FrameStream setStreamAndProperty(PropertyKey streamKey, Http2Stream stream) {
752             assert id == -1 || stream.id() == id;
753             this.stream = stream;
754             stream.setProperty(streamKey, this);
755             return this;
756         }
757 
758         @Override
759         public int id() {
760             Http2Stream stream = this.stream;
761             return stream == null ? id : stream.id();
762         }
763 
764         @Override
765         public State state() {
766             Http2Stream stream = this.stream;
767             return stream == null ? State.IDLE : stream.state();
768         }
769 
770         @Override
771         public String toString() {
772             return String.valueOf(id());
773         }
774     }
775 }