View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.http2;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.util.Resource;
20  import io.netty5.channel.Channel;
21  import io.netty5.channel.ChannelHandler;
22  import io.netty5.channel.ChannelHandlerContext;
23  import io.netty5.handler.codec.UnsupportedMessageTypeException;
24  import io.netty5.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
25  import io.netty5.handler.codec.http2.Http2Connection.PropertyKey;
26  import io.netty5.handler.codec.http2.Http2Stream.State;
27  import io.netty5.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
28  import io.netty5.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
29  import io.netty5.util.ReferenceCounted;
30  import io.netty5.util.collection.IntObjectHashMap;
31  import io.netty5.util.collection.IntObjectMap;
32  import io.netty5.util.concurrent.Future;
33  import io.netty5.util.concurrent.Promise;
34  import io.netty5.util.internal.UnstableApi;
35  import io.netty5.util.internal.logging.InternalLogger;
36  import io.netty5.util.internal.logging.InternalLoggerFactory;
37  
38  import static io.netty5.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
39  import static io.netty5.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
40  import static io.netty5.handler.codec.http2.Http2Error.NO_ERROR;
41  import static io.netty5.util.internal.logging.InternalLogLevel.DEBUG;
42  import static java.nio.charset.StandardCharsets.US_ASCII;
43  
44  /**
45   * <p><em>This API is very immature.</em> The Http2Connection-based API is currently preferred over this API.
46   * This API is targeted to eventually replace or reduce the need for the {@link Http2ConnectionHandler} API.
47   *
48   * <p>An HTTP/2 handler that maps HTTP/2 frames to {@link Http2Frame} objects and vice versa. For every incoming HTTP/2
49   * frame, an {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outbound {@link Http2Frame}
50   * objects received via {@link #write} are converted to the HTTP/2 wire format. HTTP/2 frames specific to a stream
51   * implement the {@link Http2StreamFrame} interface. The {@link Http2FrameCodec} is instantiated using the
52   * {@link Http2FrameCodecBuilder}. It's recommended for channel handlers to inherit from the
53   * {@link Http2ChannelDuplexHandler}, as it provides additional functionality like iterating over all active streams or
54   * creating outbound streams.
55   *
56   * <h3>Stream Lifecycle</h3>
57   * <p>
58   * The frame codec delivers and writes frames for active streams. An active stream is closed when either side sends a
59   * {@code RST_STREAM} frame or both sides send a frame with the {@code END_STREAM} flag set. Each
60   * {@link Http2StreamFrame} has a {@link Http2FrameStream} object attached that uniquely identifies a particular stream.
61   *
62   * <p>{@link Http2StreamFrame}s read from the channel always a {@link Http2FrameStream} object set, while when writing a
63   * {@link Http2StreamFrame} the application code needs to set a {@link Http2FrameStream} object using
64   * {@link Http2StreamFrame#stream(Http2FrameStream)}.
65   *
66   * <h3>Flow control</h3>
67   * <p>
68   * The frame codec automatically increments stream and connection flow control windows.
69   *
70   * <p>Incoming flow controlled frames need to be consumed by writing a {@link Http2WindowUpdateFrame} with the consumed
71   * number of bytes and the corresponding stream identifier set to the frame codec.
72   *
73   * <p>The local stream-level flow control window can be changed by writing a {@link Http2SettingsFrame} with the
74   * {@link Http2Settings#initialWindowSize()} set to the targeted value.
75   *
76   * <p>The connection-level flow control window can be changed by writing a {@link Http2WindowUpdateFrame} with the
77   * desired window size <em>increment</em> in bytes and the stream identifier set to {@code 0}. By default the initial
78   * connection-level flow control window is the same as initial stream-level flow control window.
79   *
80   * <h3>New inbound Streams</h3>
81   * <p>
82   * The first frame of an HTTP/2 stream must be an {@link Http2HeadersFrame}, which will have an {@link Http2FrameStream}
83   * object attached.
84   *
85   * <h3>New outbound Streams</h3>
86   * <p>
87   * A outbound HTTP/2 stream can be created by first instantiating a new {@link Http2FrameStream} object via
88   * {@link Http2ChannelDuplexHandler#newStream()}, and then writing a {@link Http2HeadersFrame} object with the stream
89   * attached.
90   *
91   * <pre>{@code
92   *     final Http2Stream2 stream = handler.newStream();
93   *     ctx.write(headersFrame.stream(stream)).addListener(new FutureListener<Void>() {
94   *
95   *         @Override
96   *         public void operationComplete(Future<Void> f) {
97   *             if (f.isSuccess()) {
98   *                 // Stream is active and stream.id() returns a valid stream identifier.
99   *                 System.out.println("New stream with id " + stream.id() + " created.");
100  *             } else {
101  *                 // Stream failed to become active. Handle error.
102  *                 if (f.cause() instanceof Http2NoMoreStreamIdsException) {
103  *
104  *                 } else if (f.cause() instanceof Http2GoAwayException) {
105  *
106  *                 } else {
107  *
108  *                 }
109  *             }
110  *         }
111  *     }
112  * }</pre>
113  *
114  * <p>If a new stream cannot be created due to stream id exhaustion of the endpoint, the {@link Promise} of the
115  * HEADERS frame will fail with a {@link Http2NoMoreStreamIdsException}.
116  *
117  * <p>The HTTP/2 standard allows for an endpoint to limit the maximum number of concurrently active streams via the
118  * {@code SETTINGS_MAX_CONCURRENT_STREAMS} setting. When this limit is reached, no new streams can be created. However,
119  * the {@link Http2FrameCodec} can be build with
120  * {@link Http2FrameCodecBuilder#encoderEnforceMaxConcurrentStreams(boolean)} enabled, in which case a new stream and
121  * its associated frames will be buffered until either the limit is increased or an active stream is closed. It's,
122  * however, possible that a buffered stream will never become active. That is, the channel might
123  * get closed or a GO_AWAY frame might be received. In the first case, all writes of buffered streams will fail with a
124  * {@link Http2ChannelClosedException}. In the second case, all writes of buffered streams with an identifier less than
125  * the last stream identifier of the GO_AWAY frame will fail with a {@link Http2GoAwayException}.
126  *
127  * <h3>Error Handling</h3>
128  * <p>
129  * Exceptions and errors are propagated via {@link ChannelHandler#channelExceptionCaught}. Exceptions that apply to
130  * a specific HTTP/2 stream are wrapped in a {@link Http2FrameStreamException} and have the corresponding
131  * {@link Http2FrameStream} object attached.
132  *
133  * <h3>Reference Counting</h3>
134  * <p>
135  * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
136  * reference counted objects (e.g. {@link Buffer}s). The frame codec will call {@link ReferenceCounted#retain()} before
137  * propagating a reference counted object through the pipeline, and thus an application handler needs to release such
138  * an object after having consumed it. For more information on reference counting take a look at
139  * https://netty.io/wiki/reference-counted-objects.html
140  *
141  * <h3>HTTP Upgrade</h3>
142  * <p>
143  * Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link Http2ServerUpgradeCodec}; the necessary
144  * HTTP-to-HTTP/2 conversion is performed automatically.
145  */
146 @UnstableApi
147 public class Http2FrameCodec extends Http2ConnectionHandler {
148 
149     private static final InternalLogger LOG = InternalLoggerFactory.getInstance(Http2FrameCodec.class);
150 
151     protected final PropertyKey streamKey;
152     private final PropertyKey upgradeKey;
153 
154     private final Integer initialFlowControlWindowSize;
155 
156     ChannelHandlerContext ctx;
157 
158     /**
159      * Number of buffered streams if the {@link StreamBufferingEncoder} is used.
160      **/
161     private int numBufferedStreams;
162     private final IntObjectMap<DefaultHttp2FrameStream> frameStreamToInitializeMap =
163             new IntObjectHashMap<>(8);
164 
165     Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings,
166                     boolean decoupleCloseAndGoAway, boolean flushPreface) {
167         super(decoder, encoder, initialSettings, decoupleCloseAndGoAway, flushPreface);
168 
169         decoder.frameListener(new FrameListener());
170         connection().addListener(new ConnectionListener());
171         connection().remote().flowController().listener(new Http2RemoteFlowControllerListener());
172         streamKey = connection().newKey();
173         upgradeKey = connection().newKey();
174         initialFlowControlWindowSize = initialSettings.initialWindowSize();
175     }
176 
177     /**
178      * Creates a new outbound/local stream.
179      */
180     DefaultHttp2FrameStream newStream() {
181         return new DefaultHttp2FrameStream();
182     }
183 
184     /**
185      * Iterates over all active HTTP/2 streams.
186      *
187      * <p>This method must not be called outside of the event loop.
188      */
189     final void forEachActiveStream(final Http2FrameStreamVisitor streamVisitor) throws Http2Exception {
190         assert ctx.executor().inEventLoop();
191 
192         if (connection().numActiveStreams() > 0) {
193             connection().forEachActiveStream(stream -> {
194                 try {
195                     return streamVisitor.visit(stream.getProperty(streamKey));
196                 } catch (Throwable cause) {
197                     onError(ctx, false, cause);
198                     return false;
199                 }
200             });
201         }
202     }
203 
204     /**
205      * Retrieve the number of streams currently in the process of being initialized.
206      * <p>
207      * This is package-private for testing only.
208      */
209     int numInitializingStreams() {
210         return frameStreamToInitializeMap.size();
211     }
212 
213     @Override
214     public void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
215         super.handlerAdded0(ctx);
216         this.ctx = ctx;
217         // Must be after Http2ConnectionHandler does its initialization in handlerAdded above.
218         // The server will not send a connection preface so we are good to send a window update.
219         Http2Connection connection = connection();
220         if (connection.isServer()) {
221             tryExpandConnectionFlowControlWindow(connection);
222         }
223     }
224 
225     private void tryExpandConnectionFlowControlWindow(Http2Connection connection) throws Http2Exception {
226         if (initialFlowControlWindowSize != null) {
227             // The window size in the settings explicitly excludes the connection window. So we manually manipulate the
228             // connection window to accommodate more concurrent data per connection.
229             Http2Stream connectionStream = connection.connectionStream();
230             Http2LocalFlowController localFlowController = connection.local().flowController();
231             final int delta = initialFlowControlWindowSize - localFlowController.initialWindowSize(connectionStream);
232             // Only increase the connection window, don't decrease it.
233             if (delta > 0) {
234                 // Double the delta just so a single stream can't exhaust the connection window.
235                 localFlowController.incrementWindowSize(connectionStream, Math.max(delta << 1, delta));
236                 flush(ctx);
237             }
238         }
239     }
240 
241     /**
242      * Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via
243      * HTTP/2 on stream 1 (the stream specifically reserved for cleartext HTTP upgrade).
244      */
245     @Override
246     public final void channelInboundEvent(final ChannelHandlerContext ctx, final Object evt) throws Exception {
247         if (evt == Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE) {
248             // The user event implies that we are on the client.
249             tryExpandConnectionFlowControlWindow(connection());
250 
251             // We schedule this on the EventExecutor to allow to have any extra handlers added to the pipeline
252             // before we pass the event to the next handler. This is needed as the event may be called from within
253             // handlerAdded(...) which will be run before other handlers will be added to the pipeline.
254             ctx.executor().execute(() -> ctx.fireChannelInboundEvent(evt));
255         } else if (evt instanceof UpgradeEvent) {
256             try (UpgradeEvent upgrade = (UpgradeEvent) evt) {
257                 // TODO try to avoid full copy (maybe make it read-only?)
258                 ctx.fireChannelInboundEvent(upgrade.copy());
259                 Http2Stream stream = connection().stream(HTTP_UPGRADE_STREAM_ID);
260                 if (stream.getProperty(streamKey) == null) {
261                     // TODO: improve handler/stream lifecycle so that stream isn't active before handler added.
262                     // The stream was already made active, but ctx may have been null so it wasn't initialized.
263                     // https://github.com/netty/netty/issues/4942
264                     onStreamActive0(stream);
265                 }
266                 upgrade.upgradeRequest().headers().setInt(
267                         HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), HTTP_UPGRADE_STREAM_ID);
268                 stream.setProperty(upgradeKey, true);
269                 InboundHttpToHttp2Adapter.handle(
270                         ctx, connection(), decoder().frameListener(), upgrade.upgradeRequest());
271             }
272         } else {
273             ctx.fireChannelInboundEvent(evt);
274         }
275     }
276 
277     /**
278      * Processes all {@link Http2Frame}s. {@link Http2StreamFrame}s may only originate in child
279      * streams.
280      */
281     @Override
282     public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
283         if (msg instanceof Http2DataFrame) {
284             Http2DataFrame dataFrame = (Http2DataFrame) msg;
285             return encoder().writeData(ctx, dataFrame.stream().id(), dataFrame.content(),
286                     dataFrame.padding(), dataFrame.isEndStream());
287         } else if (msg instanceof Http2HeadersFrame) {
288             return writeHeadersFrame(ctx, (Http2HeadersFrame) msg);
289         } else if (msg instanceof Http2WindowUpdateFrame) {
290             Http2WindowUpdateFrame frame = (Http2WindowUpdateFrame) msg;
291             Http2FrameStream frameStream = frame.stream();
292             // It is legit to send a WINDOW_UPDATE frame for the connection stream. The parent channel doesn't attempt
293             // to set the Http2FrameStream so we assume if it is null the WINDOW_UPDATE is for the connection stream.
294             try {
295                 if (frameStream == null) {
296                     increaseInitialConnectionWindow(frame.windowSizeIncrement());
297                 } else {
298                     consumeBytes(frameStream.id(), frame.windowSizeIncrement());
299                 }
300                 return ctx.newSucceededFuture();
301             } catch (Throwable t) {
302                 return ctx.newFailedFuture(t);
303             }
304         } else if (msg instanceof Http2ResetFrame) {
305             Http2ResetFrame rstFrame = (Http2ResetFrame) msg;
306             int id = rstFrame.stream().id();
307             // Only ever send a reset frame if stream may have existed before as otherwise we may send a RST on a
308             // stream in an invalid state and cause a connection error.
309             if (connection().streamMayHaveExisted(id)) {
310                 return encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode());
311             } else {
312                 Resource.dispose(rstFrame);
313                 return ctx.newFailedFuture(Http2Exception.streamError(
314                         rstFrame.stream().id(), Http2Error.PROTOCOL_ERROR, "Stream never existed"));
315             }
316         } else if (msg instanceof Http2PingFrame) {
317             Http2PingFrame frame = (Http2PingFrame) msg;
318             return encoder().writePing(ctx, frame.ack(), frame.content());
319         } else if (msg instanceof Http2SettingsFrame) {
320             return encoder().writeSettings(ctx, ((Http2SettingsFrame) msg).settings());
321         } else if (msg instanceof Http2SettingsAckFrame) {
322             // In the event of manual SETTINGS ACK, it is assumed the encoder will apply the earliest received but not
323             // yet ACKed settings.
324             return encoder().writeSettingsAck(ctx);
325         } else if (msg instanceof Http2GoAwayFrame) {
326             return writeGoAwayFrame(ctx, (Http2GoAwayFrame) msg);
327         } else if (msg instanceof Http2PushPromiseFrame) {
328             Http2PushPromiseFrame pushPromiseFrame = (Http2PushPromiseFrame) msg;
329             return writePushPromise(ctx, pushPromiseFrame);
330         } else if (msg instanceof Http2PriorityFrame) {
331             Http2PriorityFrame priorityFrame = (Http2PriorityFrame) msg;
332             return encoder().writePriority(ctx, priorityFrame.stream().id(), priorityFrame.streamDependency(),
333                     priorityFrame.weight(), priorityFrame.exclusive());
334         } else if (msg instanceof Http2UnknownFrame) {
335             Http2UnknownFrame unknownFrame = (Http2UnknownFrame) msg;
336             return encoder().writeFrame(ctx, unknownFrame.frameType(), unknownFrame.stream().id(),
337                     unknownFrame.flags(), unknownFrame.content());
338         } else if (!(msg instanceof Http2Frame)) {
339             return ctx.write(msg);
340         } else {
341             Resource.dispose(msg);
342             return ctx.newFailedFuture(new UnsupportedMessageTypeException(msg));
343         }
344     }
345 
346     private void increaseInitialConnectionWindow(int deltaBytes) throws Http2Exception {
347         // The LocalFlowController is responsible for detecting over/under flow.
348         connection().local().flowController().incrementWindowSize(connection().connectionStream(), deltaBytes);
349     }
350 
351     final boolean consumeBytes(int streamId, int bytes) throws Http2Exception {
352         Http2Stream stream = connection().stream(streamId);
353         // Upgraded requests are ineligible for stream control. We add the null check
354         // in case the stream has been deregistered.
355         if (stream != null && streamId == HTTP_UPGRADE_STREAM_ID) {
356             Boolean upgraded = stream.getProperty(upgradeKey);
357             if (Boolean.TRUE.equals(upgraded)) {
358                 return false;
359             }
360         }
361 
362         return connection().local().flowController().consumeBytes(stream, bytes);
363     }
364 
365     private Future<Void> writeGoAwayFrame(ChannelHandlerContext ctx, Http2GoAwayFrame frame) {
366         if (frame.lastStreamId() > -1) {
367             frame.close();
368             return ctx.newFailedFuture(new IllegalArgumentException("Last stream id must not be set on GOAWAY frame"));
369         }
370 
371         int lastStreamCreated = connection().remote().lastStreamCreated();
372         long lastStreamId = lastStreamCreated + (long) frame.extraStreamIds() * 2;
373         // Check if the computation overflowed.
374         if (lastStreamId > Integer.MAX_VALUE) {
375             lastStreamId = Integer.MAX_VALUE;
376         }
377         return goAway(ctx, (int) lastStreamId, frame.errorCode(), frame.content());
378     }
379 
380     private Future<Void> writeHeadersFrame(final ChannelHandlerContext ctx, Http2HeadersFrame headersFrame) {
381 
382         if (isStreamIdValid(headersFrame.stream().id())) {
383             return encoder().writeHeaders(ctx, headersFrame.stream().id(), headersFrame.headers(),
384                     headersFrame.padding(), headersFrame.isEndStream());
385         } else {
386             Future<Void> future = initializeNewStream(ctx, (DefaultHttp2FrameStream) headersFrame.stream());
387             if (future == null) {
388                 final int streamId = headersFrame.stream().id();
389 
390                 future = encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
391                         headersFrame.isEndStream());
392 
393                 if (!future.isDone()) {
394                     numBufferedStreams++;
395                     // Clean up the stream being initialized if writing the headers fails and also
396                     // decrement the number of buffered streams.
397                     future.addListener(channelFuture -> {
398                         numBufferedStreams--;
399 
400                         handleHeaderFuture(channelFuture, streamId);
401                     });
402                 } else {
403                     handleHeaderFuture(future, streamId);
404                 }
405             }
406             return future;
407         }
408     }
409 
410     private Future<Void> writePushPromise(final ChannelHandlerContext ctx, Http2PushPromiseFrame pushPromiseFrame) {
411         if (isStreamIdValid(pushPromiseFrame.pushStream().id())) {
412             return encoder().writePushPromise(ctx, pushPromiseFrame.stream().id(), pushPromiseFrame.pushStream().id(),
413                     pushPromiseFrame.http2Headers(), pushPromiseFrame.padding());
414         } else {
415             Future<Void> future = initializeNewStream(ctx, (DefaultHttp2FrameStream) pushPromiseFrame.pushStream());
416             if (future == null) {
417                 final int streamId = pushPromiseFrame.stream().id();
418                 future = encoder().writePushPromise(ctx, streamId, pushPromiseFrame.pushStream().id(),
419                         pushPromiseFrame.http2Headers(), pushPromiseFrame.padding());
420 
421                 if (future.isDone()) {
422                     handleHeaderFuture(future, streamId);
423                 } else {
424                     numBufferedStreams++;
425                     // Clean up the stream being initialized if writing the headers fails and also
426                     // decrement the number of buffered streams.
427                     future.addListener(f -> {
428                         numBufferedStreams--;
429                         handleHeaderFuture(f, streamId);
430                     });
431                 }
432                 return future;
433             }
434             return future;
435         }
436     }
437 
438     private Future<Void> initializeNewStream(ChannelHandlerContext ctx, DefaultHttp2FrameStream http2FrameStream) {
439         final Http2Connection connection = connection();
440         final int streamId = connection.local().incrementAndGetNextStreamId();
441         if (streamId < 0) {
442             Future<Void> f = ctx.newFailedFuture(new Http2NoMoreStreamIdsException());
443 
444             // Simulate a GOAWAY being received due to stream exhaustion on this connection. We use the maximum
445             // valid stream ID for the current peer.
446             onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(connection.isServer() ? Integer.MAX_VALUE :
447                     Integer.MAX_VALUE - 1, NO_ERROR.code(),
448                     ctx.bufferAllocator().copyOf("Stream IDs exhausted on local stream creation", US_ASCII).send()));
449 
450             return f;
451         }
452         http2FrameStream.id = streamId;
453 
454         // Use a Map to store all pending streams as we may have multiple. This is needed as if we would store the
455         // stream in a field directly we may override the stored field before onStreamAdded(...) was called
456         // and so not correctly set the property for the buffered stream.
457         //
458         // See https://github.com/netty/netty/issues/8692
459         Object old = frameStreamToInitializeMap.put(streamId, http2FrameStream);
460 
461         // We should not re-use ids.
462         assert old == null;
463         return null;
464     }
465 
466     private void handleHeaderFuture(Future<?> channelFuture, int streamId) {
467         if (channelFuture.isFailed()) {
468             frameStreamToInitializeMap.remove(streamId);
469         }
470     }
471 
472     private void onStreamActive0(Http2Stream stream) {
473         if (stream.id() != HTTP_UPGRADE_STREAM_ID &&
474                 connection().local().isValidStreamId(stream.id())) {
475             return;
476         }
477 
478         DefaultHttp2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
479         onHttp2StreamStateChanged(ctx, stream2);
480     }
481 
482     private final class ConnectionListener extends Http2ConnectionAdapter {
483         @Override
484         public void onStreamAdded(Http2Stream stream) {
485             DefaultHttp2FrameStream frameStream = frameStreamToInitializeMap.remove(stream.id());
486 
487             if (frameStream != null) {
488                 frameStream.setStreamAndProperty(streamKey, stream);
489             }
490         }
491 
492         @Override
493         public void onStreamActive(Http2Stream stream) {
494             onStreamActive0(stream);
495         }
496 
497         @Override
498         public void onStreamClosed(Http2Stream stream) {
499             onHttp2StreamStateChanged0(stream);
500         }
501 
502         @Override
503         public void onStreamHalfClosed(Http2Stream stream) {
504             onHttp2StreamStateChanged0(stream);
505         }
506 
507         private void onHttp2StreamStateChanged0(Http2Stream stream) {
508             DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
509             if (stream2 != null) {
510                 onHttp2StreamStateChanged(ctx, stream2);
511             }
512         }
513     }
514 
515     @Override
516     protected void onConnectionError(
517             ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex) {
518         if (!outbound) {
519             // allow the user to handle it first in the pipeline, and then automatically clean up.
520             // If this is not desired behavior the user can override this method.
521             //
522             // We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
523             ctx.fireChannelExceptionCaught(cause);
524         }
525         super.onConnectionError(ctx, outbound, cause, http2Ex);
526     }
527 
528     /**
529      * Exceptions for unknown streams, that is streams that have no {@link Http2FrameStream} object attached
530      * are simply logged and replied to by sending a RST_STREAM frame.
531      */
532     @Override
533     protected final void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
534                                        Http2Exception.StreamException streamException) {
535         int streamId = streamException.streamId();
536         Http2Stream connectionStream = connection().stream(streamId);
537         if (connectionStream == null) {
538             onHttp2UnknownStreamError(ctx, cause, streamException);
539             // Write a RST_STREAM
540             super.onStreamError(ctx, outbound, cause, streamException);
541             return;
542         }
543 
544         Http2FrameStream stream = connectionStream.getProperty(streamKey);
545         if (stream == null) {
546             LOG.warn("Stream exception thrown without stream object attached.", cause);
547             // Write a RST_STREAM
548             super.onStreamError(ctx, outbound, cause, streamException);
549             return;
550         }
551 
552         if (!outbound) {
553             // We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
554             onHttp2FrameStreamException(ctx, new Http2FrameStreamException(stream, streamException.error(), cause));
555         }
556     }
557 
558     private static void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx,
559             Throwable cause, Http2Exception.StreamException streamException) {
560         // We log here for debugging purposes. This exception will be propagated to the upper layers through other ways:
561         // - fireExceptionCaught
562         // - fireUserEventTriggered(Http2ResetFrame), see Http2MultiplexHandler#channelRead(...)
563         // - by failing write promise
564         // Receiver of the error is responsible for correct handling of this exception.
565         LOG.log(DEBUG, "Stream exception thrown for unknown stream {}.", streamException.streamId(), cause);
566     }
567 
568     @Override
569     protected final boolean isGracefulShutdownComplete() {
570         return super.isGracefulShutdownComplete() && numBufferedStreams == 0;
571     }
572 
573     private final class FrameListener implements Http2FrameListener {
574 
575         @Override
576         public void onUnknownFrame(
577                 ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, Buffer payload) {
578             if (streamId == 0) {
579                 // Ignore unknown frames on connection stream, for example: HTTP/2 GREASE testing
580                 return;
581             }
582             onHttp2Frame(ctx, new DefaultHttp2UnknownFrame(frameType, flags, payload)
583                     .stream(requireStream(streamId)).copy());
584         }
585 
586         @Override
587         public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
588             onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings));
589         }
590 
591         @Override
592         public void onPingRead(ChannelHandlerContext ctx, long data) {
593             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false));
594         }
595 
596         @Override
597         public void onPingAckRead(ChannelHandlerContext ctx, long data) {
598             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true));
599         }
600 
601         @Override
602         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
603             onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(requireStream(streamId)));
604         }
605 
606         @Override
607         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
608             if (streamId == 0) {
609                 // Ignore connection window updates.
610                 return;
611             }
612             onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(requireStream(streamId)));
613         }
614 
615         @Override
616         public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
617                                   Http2Headers headers, int streamDependency, short weight, boolean
618                                           exclusive, int padding, boolean endStream) {
619             onHeadersRead(ctx, streamId, headers, padding, endStream);
620         }
621 
622         @Override
623         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
624                                   int padding, boolean endOfStream) {
625             onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding)
626                     .stream(requireStream(streamId)));
627         }
628 
629         @Override
630         public int onDataRead(ChannelHandlerContext ctx, int streamId, Buffer data, int padding,
631                               boolean endOfStream) {
632             onHttp2Frame(ctx, new DefaultHttp2DataFrame(data.send(), endOfStream, padding)
633                     .stream(requireStream(streamId)));
634             // We return the bytes in consumeBytes() once the stream channel consumed the bytes.
635             return 0;
636         }
637 
638         @Override
639         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, Buffer debugData) {
640             onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData.send()));
641         }
642 
643         @Override
644         public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
645                                    short weight, boolean exclusive) {
646 
647             Http2Stream stream = connection().stream(streamId);
648             if (stream == null) {
649                 // The stream was not opened yet, let's just ignore this for now.
650                 return;
651             }
652             onHttp2Frame(ctx, new DefaultHttp2PriorityFrame(streamDependency, weight, exclusive)
653                     .stream(requireStream(streamId)));
654         }
655 
656         @Override
657         public void onSettingsAckRead(ChannelHandlerContext ctx) {
658             onHttp2Frame(ctx, Http2SettingsAckFrame.INSTANCE);
659         }
660 
661         @Override
662         public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
663                                       Http2Headers headers, int padding) {
664             onHttp2Frame(ctx, new DefaultHttp2PushPromiseFrame(headers, padding, promisedStreamId)
665                     .pushStream(new DefaultHttp2FrameStream()
666                             .setStreamAndProperty(streamKey, connection().stream(promisedStreamId)))
667                     .stream(requireStream(streamId)));
668         }
669 
670         private Http2FrameStream requireStream(int streamId) {
671             Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey);
672             if (stream == null) {
673                 throw new IllegalStateException("Stream object required for identifier: " + streamId);
674             }
675             return stream;
676         }
677     }
678 
679     private void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream,
680                                                  @SuppressWarnings("unused") boolean writable) {
681         ctx.fireChannelInboundEvent(stream.writabilityChanged);
682     }
683 
684     void onHttp2StreamStateChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
685         ctx.fireChannelInboundEvent(stream.stateChanged);
686     }
687 
688     void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
689         ctx.fireChannelRead(frame);
690     }
691 
692     void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
693         ctx.fireChannelExceptionCaught(cause);
694     }
695 
696     private final class Http2RemoteFlowControllerListener implements Http2RemoteFlowController.Listener {
697         @Override
698         public void writabilityChanged(Http2Stream stream) {
699             DefaultHttp2FrameStream frameStream = stream.getProperty(streamKey);
700             if (frameStream == null) {
701                 return;
702             }
703             onHttp2StreamWritabilityChanged(
704                     ctx, frameStream, connection().remote().flowController().isWritable(stream));
705         }
706     }
707 
708     /**
709      * {@link Http2FrameStream} implementation.
710      */
711     // TODO(buchgr): Merge Http2FrameStream and Http2Stream.
712     static class DefaultHttp2FrameStream implements Http2FrameStream {
713 
714         private volatile int id = -1;
715         private volatile Http2Stream stream;
716 
717         final Http2FrameStreamEvent stateChanged = Http2FrameStreamEvent.stateChanged(this);
718         final Http2FrameStreamEvent writabilityChanged = Http2FrameStreamEvent.writabilityChanged(this);
719 
720         Channel attachment;
721 
722         DefaultHttp2FrameStream setStreamAndProperty(PropertyKey streamKey, Http2Stream stream) {
723             assert id == -1 || stream.id() == id;
724             this.stream = stream;
725             stream.setProperty(streamKey, this);
726             return this;
727         }
728 
729         @Override
730         public int id() {
731             Http2Stream stream = this.stream;
732             return stream == null ? id : stream.id();
733         }
734 
735         @Override
736         public State state() {
737             Http2Stream stream = this.stream;
738             return stream == null ? State.IDLE : stream.state();
739         }
740 
741         @Override
742         public String toString() {
743             return String.valueOf(id());
744         }
745     }
746 }