View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.channel.ChannelHandlerContext;
19  import io.netty.handler.codec.http.HttpHeaderNames;
20  import io.netty.handler.codec.http.HttpStatusClass;
21  import io.netty.handler.codec.http.HttpUtil;
22  import io.netty.handler.codec.http2.Http2Connection.Endpoint;
23  import io.netty.util.internal.UnstableApi;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  
27  import java.util.List;
28  
29  import static io.netty.handler.codec.http.HttpStatusClass.INFORMATIONAL;
30  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
31  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
32  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
33  import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
34  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
35  import static io.netty.handler.codec.http2.Http2Exception.streamError;
36  import static io.netty.handler.codec.http2.Http2PromisedRequestVerifier.ALWAYS_VERIFY;
37  import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
38  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
39  import static io.netty.util.internal.ObjectUtil.checkNotNull;
40  import static java.lang.Integer.MAX_VALUE;
41  import static java.lang.Math.min;
42  
43  /**
44   * Provides the default implementation for processing inbound frame events and delegates to a
45   * {@link Http2FrameListener}
46   * <p>
47   * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
48   * <p>
49   * This interface enforces inbound flow control functionality through
50   * {@link Http2LocalFlowController}
51   */
52  @UnstableApi
53  public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
54      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2ConnectionDecoder.class);
55      private Http2FrameListener internalFrameListener = new PrefaceFrameListener();
56      private final Http2Connection connection;
57      private Http2LifecycleManager lifecycleManager;
58      private final Http2ConnectionEncoder encoder;
59      private final Http2FrameReader frameReader;
60      private Http2FrameListener listener;
61      private final Http2PromisedRequestVerifier requestVerifier;
62      private final Http2SettingsReceivedConsumer settingsReceivedConsumer;
63      private final boolean autoAckPing;
64      private final Http2Connection.PropertyKey contentLengthKey;
65  
66      public DefaultHttp2ConnectionDecoder(Http2Connection connection,
67                                           Http2ConnectionEncoder encoder,
68                                           Http2FrameReader frameReader) {
69          this(connection, encoder, frameReader, ALWAYS_VERIFY);
70      }
71  
72      public DefaultHttp2ConnectionDecoder(Http2Connection connection,
73                                           Http2ConnectionEncoder encoder,
74                                           Http2FrameReader frameReader,
75                                           Http2PromisedRequestVerifier requestVerifier) {
76          this(connection, encoder, frameReader, requestVerifier, true);
77      }
78  
79      /**
80       * Create a new instance.
81       * @param connection The {@link Http2Connection} associated with this decoder.
82       * @param encoder The {@link Http2ConnectionEncoder} associated with this decoder.
83       * @param frameReader Responsible for reading/parsing the raw frames. As opposed to this object which applies
84       *                    h2 semantics on top of the frames.
85       * @param requestVerifier Determines if push promised streams are valid.
86       * @param autoAckSettings {@code false} to disable automatically applying and sending settings acknowledge frame.
87       *  The {@code Http2ConnectionEncoder} is expected to be an instance of {@link Http2SettingsReceivedConsumer} and
88       *  will apply the earliest received but not yet ACKed SETTINGS when writing the SETTINGS ACKs.
89       * {@code true} to enable automatically applying and sending settings acknowledge frame.
90       */
91      public DefaultHttp2ConnectionDecoder(Http2Connection connection,
92                                           Http2ConnectionEncoder encoder,
93                                           Http2FrameReader frameReader,
94                                           Http2PromisedRequestVerifier requestVerifier,
95                                           boolean autoAckSettings) {
96          this(connection, encoder, frameReader, requestVerifier, autoAckSettings, true);
97      }
98  
99      /**
100      * Create a new instance.
101      * @param connection The {@link Http2Connection} associated with this decoder.
102      * @param encoder The {@link Http2ConnectionEncoder} associated with this decoder.
103      * @param frameReader Responsible for reading/parsing the raw frames. As opposed to this object which applies
104      *                    h2 semantics on top of the frames.
105      * @param requestVerifier Determines if push promised streams are valid.
106      * @param autoAckSettings {@code false} to disable automatically applying and sending settings acknowledge frame.
107      *                        The {@code Http2ConnectionEncoder} is expected to be an instance of
108      *                        {@link Http2SettingsReceivedConsumer} and will apply the earliest received but not yet
109      *                        ACKed SETTINGS when writing the SETTINGS ACKs. {@code true} to enable automatically
110      *                        applying and sending settings acknowledge frame.
111      * @param autoAckPing {@code false} to disable automatically sending ping acknowledge frame. {@code true} to enable
112      *                    automatically sending ping ack frame.
113      */
114     public DefaultHttp2ConnectionDecoder(Http2Connection connection,
115                                          Http2ConnectionEncoder encoder,
116                                          Http2FrameReader frameReader,
117                                          Http2PromisedRequestVerifier requestVerifier,
118                                          boolean autoAckSettings,
119                                          boolean autoAckPing) {
120         this.autoAckPing = autoAckPing;
121         if (autoAckSettings) {
122             settingsReceivedConsumer = null;
123         } else {
124             if (!(encoder instanceof Http2SettingsReceivedConsumer)) {
125                 throw new IllegalArgumentException("disabling autoAckSettings requires the encoder to be a " +
126                         Http2SettingsReceivedConsumer.class);
127             }
128             settingsReceivedConsumer = (Http2SettingsReceivedConsumer) encoder;
129         }
130         this.connection = checkNotNull(connection, "connection");
131         contentLengthKey = this.connection.newKey();
132         this.frameReader = checkNotNull(frameReader, "frameReader");
133         this.encoder = checkNotNull(encoder, "encoder");
134         this.requestVerifier = checkNotNull(requestVerifier, "requestVerifier");
135         if (connection.local().flowController() == null) {
136             connection.local().flowController(new DefaultHttp2LocalFlowController(connection));
137         }
138         connection.local().flowController().frameWriter(encoder.frameWriter());
139     }
140 
141     @Override
142     public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
143         this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
144     }
145 
146     @Override
147     public Http2Connection connection() {
148         return connection;
149     }
150 
151     @Override
152     public final Http2LocalFlowController flowController() {
153         return connection.local().flowController();
154     }
155 
156     @Override
157     public void frameListener(Http2FrameListener listener) {
158         this.listener = checkNotNull(listener, "listener");
159     }
160 
161     @Override
162     public Http2FrameListener frameListener() {
163         return listener;
164     }
165 
166     @Override
167     public boolean prefaceReceived() {
168         return FrameReadListener.class == internalFrameListener.getClass();
169     }
170 
171     @Override
172     public void decodeFrame(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Http2Exception {
173         frameReader.readFrame(ctx, in, internalFrameListener);
174     }
175 
176     @Override
177     public Http2Settings localSettings() {
178         Http2Settings settings = new Http2Settings();
179         Http2FrameReader.Configuration config = frameReader.configuration();
180         Http2HeadersDecoder.Configuration headersConfig = config.headersConfiguration();
181         Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
182         settings.initialWindowSize(flowController().initialWindowSize());
183         settings.maxConcurrentStreams(connection.remote().maxActiveStreams());
184         settings.headerTableSize(headersConfig.maxHeaderTableSize());
185         settings.maxFrameSize(frameSizePolicy.maxFrameSize());
186         settings.maxHeaderListSize(headersConfig.maxHeaderListSize());
187         if (!connection.isServer()) {
188             // Only set the pushEnabled flag if this is a client endpoint.
189             settings.pushEnabled(connection.local().allowPushTo());
190         }
191         return settings;
192     }
193 
194     @Override
195     public void close() {
196         frameReader.close();
197     }
198 
199     /**
200      * Calculate the threshold in bytes which should trigger a {@code GO_AWAY} if a set of headers exceeds this amount.
201      * @param maxHeaderListSize
202      *      <a href="https://tools.ietf.org/html/rfc7540#section-6.5.2">SETTINGS_MAX_HEADER_LIST_SIZE</a> for the local
203      *      endpoint.
204      * @return the threshold in bytes which should trigger a {@code GO_AWAY} if a set of headers exceeds this amount.
205      */
206     protected long calculateMaxHeaderListSizeGoAway(long maxHeaderListSize) {
207         return Http2CodecUtil.calculateMaxHeaderListSizeGoAway(maxHeaderListSize);
208     }
209 
210     private int unconsumedBytes(Http2Stream stream) {
211         return flowController().unconsumedBytes(stream);
212     }
213 
214     void onGoAwayRead0(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
215             throws Http2Exception {
216         listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData);
217         connection.goAwayReceived(lastStreamId, errorCode, debugData);
218     }
219 
220     void onUnknownFrame0(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
221             ByteBuf payload) throws Http2Exception {
222         listener.onUnknownFrame(ctx, frameType, streamId, flags, payload);
223     }
224 
225     // See https://tools.ietf.org/html/rfc7540#section-8.1.2.6
226     private void verifyContentLength(Http2Stream stream, int data, boolean isEnd) throws Http2Exception {
227         ContentLength contentLength = stream.getProperty(contentLengthKey);
228         if (contentLength != null) {
229             try {
230                 contentLength.increaseReceivedBytes(connection.isServer(), stream.id(), data, isEnd);
231             } finally {
232                 if (isEnd) {
233                     stream.removeProperty(contentLengthKey);
234                 }
235             }
236         }
237     }
238 
239     /**
240      * Handles all inbound frames from the network.
241      */
242     private final class FrameReadListener implements Http2FrameListener {
243         @Override
244         public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
245                               boolean endOfStream) throws Http2Exception {
246             Http2Stream stream = connection.stream(streamId);
247             Http2LocalFlowController flowController = flowController();
248             int readable = data.readableBytes();
249             int bytesToReturn = readable + padding;
250 
251             final boolean shouldIgnore;
252             try {
253                 shouldIgnore = shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, "DATA");
254             } catch (Http2Exception e) {
255                 // Ignoring this frame. We still need to count the frame towards the connection flow control
256                 // window, but we immediately mark all bytes as consumed.
257                 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
258                 flowController.consumeBytes(stream, bytesToReturn);
259                 throw e;
260             } catch (Throwable t) {
261                 throw connectionError(INTERNAL_ERROR, t, "Unhandled error on data stream id %d", streamId);
262             }
263 
264             if (shouldIgnore) {
265                 // Ignoring this frame. We still need to count the frame towards the connection flow control
266                 // window, but we immediately mark all bytes as consumed.
267                 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
268                 flowController.consumeBytes(stream, bytesToReturn);
269 
270                 // Verify that the stream may have existed after we apply flow control.
271                 verifyStreamMayHaveExisted(streamId);
272 
273                 // All bytes have been consumed.
274                 return bytesToReturn;
275             }
276             Http2Exception error = null;
277             switch (stream.state()) {
278                 case OPEN:
279                 case HALF_CLOSED_LOCAL:
280                     break;
281                 case HALF_CLOSED_REMOTE:
282                 case CLOSED:
283                     error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
284                         stream.id(), stream.state());
285                     break;
286                 default:
287                     error = streamError(stream.id(), PROTOCOL_ERROR,
288                         "Stream %d in unexpected state: %s", stream.id(), stream.state());
289                     break;
290             }
291 
292             int unconsumedBytes = unconsumedBytes(stream);
293             try {
294                 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
295                 // Update the unconsumed bytes after flow control is applied.
296                 unconsumedBytes = unconsumedBytes(stream);
297 
298                 // If the stream is in an invalid state to receive the frame, throw the error.
299                 if (error != null) {
300                     throw error;
301                 }
302 
303                 verifyContentLength(stream, readable, endOfStream);
304 
305                 // Call back the application and retrieve the number of bytes that have been
306                 // immediately processed.
307                 bytesToReturn = listener.onDataRead(ctx, streamId, data, padding, endOfStream);
308 
309                 if (endOfStream) {
310                     lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
311                 }
312 
313                 return bytesToReturn;
314             } catch (Http2Exception e) {
315                 // If an exception happened during delivery, the listener may have returned part
316                 // of the bytes before the error occurred. If that's the case, subtract that from
317                 // the total processed bytes so that we don't return too many bytes.
318                 int delta = unconsumedBytes - unconsumedBytes(stream);
319                 bytesToReturn -= delta;
320                 throw e;
321             } catch (RuntimeException e) {
322                 // If an exception happened during delivery, the listener may have returned part
323                 // of the bytes before the error occurred. If that's the case, subtract that from
324                 // the total processed bytes so that we don't return too many bytes.
325                 int delta = unconsumedBytes - unconsumedBytes(stream);
326                 bytesToReturn -= delta;
327                 throw e;
328             } finally {
329                 // If appropriate, return the processed bytes to the flow controller.
330                 flowController.consumeBytes(stream, bytesToReturn);
331             }
332         }
333 
334         @Override
335         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
336                 boolean endOfStream) throws Http2Exception {
337             onHeadersRead(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endOfStream);
338         }
339 
340         @Override
341         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
342                 short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
343             Http2Stream stream = connection.stream(streamId);
344             boolean allowHalfClosedRemote = false;
345             boolean isTrailers = false;
346             if (stream == null && !connection.streamMayHaveExisted(streamId)) {
347                 stream = connection.remote().createStream(streamId, endOfStream);
348                 // Allow the state to be HALF_CLOSE_REMOTE if we're creating it in that state.
349                 allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE;
350             } else if (stream != null) {
351                 isTrailers = stream.isHeadersReceived();
352             }
353 
354             if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, "HEADERS")) {
355                 return;
356             }
357 
358             boolean isInformational = !connection.isServer() &&
359                     HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;
360             if ((isInformational || !endOfStream) && stream.isHeadersReceived() || stream.isTrailersReceived()) {
361                 throw streamError(streamId, PROTOCOL_ERROR,
362                                   "Stream %d received too many headers EOS: %s state: %s",
363                                   streamId, endOfStream, stream.state());
364             }
365 
366             switch (stream.state()) {
367                 case RESERVED_REMOTE:
368                     stream.open(endOfStream);
369                     break;
370                 case OPEN:
371                 case HALF_CLOSED_LOCAL:
372                     // Allowed to receive headers in these states.
373                     break;
374                 case HALF_CLOSED_REMOTE:
375                     if (!allowHalfClosedRemote) {
376                         throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
377                                 stream.id(), stream.state());
378                     }
379                     break;
380                 case CLOSED:
381                     throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
382                             stream.id(), stream.state());
383                 default:
384                     // Connection error.
385                     throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(),
386                             stream.state());
387             }
388 
389             if (!isTrailers) {
390                 // extract the content-length header
391                 List<? extends CharSequence> contentLength = headers.getAll(HttpHeaderNames.CONTENT_LENGTH);
392                 if (contentLength != null && !contentLength.isEmpty()) {
393                     try {
394                         long cLength = HttpUtil.normalizeAndGetContentLength(contentLength, false, true);
395                         if (cLength != -1) {
396                             headers.setLong(HttpHeaderNames.CONTENT_LENGTH, cLength);
397                             stream.setProperty(contentLengthKey, new ContentLength(cLength));
398                         }
399                     } catch (IllegalArgumentException e) {
400                         throw streamError(stream.id(), PROTOCOL_ERROR, e,
401                                 "Multiple content-length headers received");
402                     }
403                 }
404             }
405 
406             stream.headersReceived(isInformational);
407             verifyContentLength(stream, 0, endOfStream);
408             encoder.flowController().updateDependencyTree(streamId, streamDependency, weight, exclusive);
409             listener.onHeadersRead(ctx, streamId, headers, streamDependency,
410                     weight, exclusive, padding, endOfStream);
411             // If the headers completes this stream, close it.
412             if (endOfStream) {
413                 lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
414             }
415         }
416 
417         @Override
418         public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
419                 boolean exclusive) throws Http2Exception {
420             encoder.flowController().updateDependencyTree(streamId, streamDependency, weight, exclusive);
421 
422             listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
423         }
424 
425         @Override
426         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
427             Http2Stream stream = connection.stream(streamId);
428             if (stream == null) {
429                 verifyStreamMayHaveExisted(streamId);
430                 return;
431             }
432 
433             switch(stream.state()) {
434             case IDLE:
435                 throw connectionError(PROTOCOL_ERROR, "RST_STREAM received for IDLE stream %d", streamId);
436             case CLOSED:
437                 return; // RST_STREAM frames must be ignored for closed streams.
438             default:
439                 break;
440             }
441 
442             listener.onRstStreamRead(ctx, streamId, errorCode);
443 
444             lifecycleManager.closeStream(stream, ctx.newSucceededFuture());
445         }
446 
447         @Override
448         public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
449             // Apply oldest outstanding local settings here. This is a synchronization point between endpoints.
450             Http2Settings settings = encoder.pollSentSettings();
451 
452             if (settings != null) {
453                 applyLocalSettings(settings);
454             }
455 
456             listener.onSettingsAckRead(ctx);
457         }
458 
459         /**
460          * Applies settings sent from the local endpoint.
461          * <p>
462          * This method is only called after the local settings have been acknowledged from the remote endpoint.
463          */
464         private void applyLocalSettings(Http2Settings settings) throws Http2Exception {
465             Boolean pushEnabled = settings.pushEnabled();
466             final Http2FrameReader.Configuration config = frameReader.configuration();
467             final Http2HeadersDecoder.Configuration headerConfig = config.headersConfiguration();
468             final Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
469             if (pushEnabled != null) {
470                 if (connection.isServer()) {
471                     throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
472                 }
473                 connection.local().allowPushTo(pushEnabled);
474             }
475 
476             Long maxConcurrentStreams = settings.maxConcurrentStreams();
477             if (maxConcurrentStreams != null) {
478                 connection.remote().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
479             }
480 
481             Long headerTableSize = settings.headerTableSize();
482             if (headerTableSize != null) {
483                 headerConfig.maxHeaderTableSize(headerTableSize);
484             }
485 
486             Long maxHeaderListSize = settings.maxHeaderListSize();
487             if (maxHeaderListSize != null) {
488                 headerConfig.maxHeaderListSize(maxHeaderListSize, calculateMaxHeaderListSizeGoAway(maxHeaderListSize));
489             }
490 
491             Integer maxFrameSize = settings.maxFrameSize();
492             if (maxFrameSize != null) {
493                 frameSizePolicy.maxFrameSize(maxFrameSize);
494             }
495 
496             Integer initialWindowSize = settings.initialWindowSize();
497             if (initialWindowSize != null) {
498                 flowController().initialWindowSize(initialWindowSize);
499             }
500         }
501 
502         @Override
503         public void onSettingsRead(final ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
504             if (settingsReceivedConsumer == null) {
505                 // Acknowledge receipt of the settings. We should do this before we process the settings to ensure our
506                 // remote peer applies these settings before any subsequent frames that we may send which depend upon
507                 // these new settings. See https://github.com/netty/netty/issues/6520.
508                 encoder.writeSettingsAck(ctx, ctx.newPromise());
509 
510                 encoder.remoteSettings(settings);
511             } else {
512                 settingsReceivedConsumer.consumeReceivedSettings(settings);
513             }
514 
515             listener.onSettingsRead(ctx, settings);
516         }
517 
518         @Override
519         public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
520             if (autoAckPing) {
521                 // Send an ack back to the remote client.
522                 encoder.writePing(ctx, true, data, ctx.newPromise());
523             }
524             listener.onPingRead(ctx, data);
525         }
526 
527         @Override
528         public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
529             listener.onPingAckRead(ctx, data);
530         }
531 
532         @Override
533         public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
534                 Http2Headers headers, int padding) throws Http2Exception {
535             // A client cannot push.
536             if (connection().isServer()) {
537                 throw connectionError(PROTOCOL_ERROR, "A client cannot push.");
538             }
539 
540             Http2Stream parentStream = connection.stream(streamId);
541 
542             if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, parentStream, "PUSH_PROMISE")) {
543                 return;
544             }
545 
546             switch (parentStream.state()) {
547               case OPEN:
548               case HALF_CLOSED_LOCAL:
549                   // Allowed to receive push promise in these states.
550                   break;
551               default:
552                   // Connection error.
553                   throw connectionError(PROTOCOL_ERROR,
554                       "Stream %d in unexpected state for receiving push promise: %s",
555                       parentStream.id(), parentStream.state());
556             }
557 
558             if (!requestVerifier.isAuthoritative(ctx, headers)) {
559                 throw streamError(promisedStreamId, PROTOCOL_ERROR,
560                         "Promised request on stream %d for promised stream %d is not authoritative",
561                         streamId, promisedStreamId);
562             }
563             if (!requestVerifier.isCacheable(headers)) {
564                 throw streamError(promisedStreamId, PROTOCOL_ERROR,
565                         "Promised request on stream %d for promised stream %d is not known to be cacheable",
566                         streamId, promisedStreamId);
567             }
568             if (!requestVerifier.isSafe(headers)) {
569                 throw streamError(promisedStreamId, PROTOCOL_ERROR,
570                         "Promised request on stream %d for promised stream %d is not known to be safe",
571                         streamId, promisedStreamId);
572             }
573 
574             // Reserve the push stream based with a priority based on the current stream's priority.
575             connection.remote().reservePushStream(promisedStreamId, parentStream);
576 
577             listener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
578         }
579 
580         @Override
581         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
582                 throws Http2Exception {
583             onGoAwayRead0(ctx, lastStreamId, errorCode, debugData);
584         }
585 
586         @Override
587         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
588                 throws Http2Exception {
589             Http2Stream stream = connection.stream(streamId);
590             if (stream == null || stream.state() == CLOSED || streamCreatedAfterGoAwaySent(streamId)) {
591                 // Ignore this frame.
592                 verifyStreamMayHaveExisted(streamId);
593                 return;
594             }
595 
596             // Update the outbound flow control window.
597             encoder.flowController().incrementWindowSize(stream, windowSizeIncrement);
598 
599             listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
600         }
601 
602         @Override
603         public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
604                 ByteBuf payload) throws Http2Exception {
605             onUnknownFrame0(ctx, frameType, streamId, flags, payload);
606         }
607 
608         /**
609          * Helper method to determine if a frame that has the semantics of headers or data should be ignored for the
610          * {@code stream} (which may be {@code null}) associated with {@code streamId}.
611          */
612         private boolean shouldIgnoreHeadersOrDataFrame(ChannelHandlerContext ctx, int streamId, Http2Stream stream,
613                 String frameName) throws Http2Exception {
614             if (stream == null) {
615                 if (streamCreatedAfterGoAwaySent(streamId)) {
616                     logger.info("{} ignoring {} frame for stream {}. Stream sent after GOAWAY sent",
617                             ctx.channel(), frameName, streamId);
618                     return true;
619                 }
620 
621                 // Make sure it's not an out-of-order frame, like a rogue DATA frame, for a stream that could
622                 // never have existed.
623                 verifyStreamMayHaveExisted(streamId);
624 
625                 // Its possible that this frame would result in stream ID out of order creation (PROTOCOL ERROR) and its
626                 // also possible that this frame is received on a CLOSED stream (STREAM_CLOSED after a RST_STREAM is
627                 // sent). We don't have enough information to know for sure, so we choose the lesser of the two errors.
628                 throw streamError(streamId, STREAM_CLOSED, "Received %s frame for an unknown stream %d",
629                                   frameName, streamId);
630             }
631             if (stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) {
632                 // If we have sent a reset stream it is assumed the stream will be closed after the write completes.
633                 // If we have not sent a reset, but the stream was created after a GoAway this is not supported by
634                 // DefaultHttp2Connection and if a custom Http2Connection is used it is assumed the lifetime is managed
635                 // elsewhere so we don't close the stream or otherwise modify the stream's state.
636 
637                 if (logger.isInfoEnabled()) {
638                     logger.info("{} ignoring {} frame for stream {}", ctx.channel(), frameName,
639                             stream.isResetSent() ? "RST_STREAM sent." :
640                                     "Stream created after GOAWAY sent. Last known stream by peer " +
641                                      connection.remote().lastStreamKnownByPeer());
642                 }
643 
644                 return true;
645             }
646             return false;
647         }
648 
649         /**
650          * Helper method for determining whether or not to ignore inbound frames. A stream is considered to be created
651          * after a {@code GOAWAY} is sent if the following conditions hold:
652          * <p/>
653          * <ul>
654          *     <li>A {@code GOAWAY} must have been sent by the local endpoint</li>
655          *     <li>The {@code streamId} must identify a legitimate stream id for the remote endpoint to be creating</li>
656          *     <li>{@code streamId} is greater than the Last Known Stream ID which was sent by the local endpoint
657          *     in the last {@code GOAWAY} frame</li>
658          * </ul>
659          * <p/>
660          */
661         private boolean streamCreatedAfterGoAwaySent(int streamId) {
662             Endpoint<?> remote = connection.remote();
663             return connection.goAwaySent() && remote.isValidStreamId(streamId) &&
664                     streamId > remote.lastStreamKnownByPeer();
665         }
666 
667         private void verifyStreamMayHaveExisted(int streamId) throws Http2Exception {
668             if (!connection.streamMayHaveExisted(streamId)) {
669                 throw connectionError(PROTOCOL_ERROR, "Stream %d does not exist", streamId);
670             }
671         }
672     }
673 
674     private final class PrefaceFrameListener implements Http2FrameListener {
675         /**
676          * Verifies that the HTTP/2 connection preface has been received from the remote endpoint.
677          * It is possible that the current call to
678          * {@link Http2FrameReader#readFrame(ChannelHandlerContext, ByteBuf, Http2FrameListener)} will have multiple
679          * frames to dispatch. So it may be OK for this class to get legitimate frames for the first readFrame.
680          */
681         private void verifyPrefaceReceived() throws Http2Exception {
682             if (!prefaceReceived()) {
683                 throw connectionError(PROTOCOL_ERROR, "Received non-SETTINGS as first frame.");
684             }
685         }
686 
687         @Override
688         public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
689                 throws Http2Exception {
690             verifyPrefaceReceived();
691             return internalFrameListener.onDataRead(ctx, streamId, data, padding, endOfStream);
692         }
693 
694         @Override
695         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
696                 boolean endOfStream) throws Http2Exception {
697             verifyPrefaceReceived();
698             internalFrameListener.onHeadersRead(ctx, streamId, headers, padding, endOfStream);
699         }
700 
701         @Override
702         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
703                 short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
704             verifyPrefaceReceived();
705             internalFrameListener.onHeadersRead(ctx, streamId, headers, streamDependency, weight,
706                     exclusive, padding, endOfStream);
707         }
708 
709         @Override
710         public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
711                 boolean exclusive) throws Http2Exception {
712             verifyPrefaceReceived();
713             internalFrameListener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
714         }
715 
716         @Override
717         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
718             verifyPrefaceReceived();
719             internalFrameListener.onRstStreamRead(ctx, streamId, errorCode);
720         }
721 
722         @Override
723         public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
724             verifyPrefaceReceived();
725             internalFrameListener.onSettingsAckRead(ctx);
726         }
727 
728         @Override
729         public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
730             // The first settings should change the internalFrameListener to the "real" listener
731             // that expects the preface to be verified.
732             if (!prefaceReceived()) {
733                 internalFrameListener = new FrameReadListener();
734             }
735             internalFrameListener.onSettingsRead(ctx, settings);
736         }
737 
738         @Override
739         public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
740             verifyPrefaceReceived();
741             internalFrameListener.onPingRead(ctx, data);
742         }
743 
744         @Override
745         public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
746             verifyPrefaceReceived();
747             internalFrameListener.onPingAckRead(ctx, data);
748         }
749 
750         @Override
751         public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
752                 Http2Headers headers, int padding) throws Http2Exception {
753             verifyPrefaceReceived();
754             internalFrameListener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
755         }
756 
757         @Override
758         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
759                 throws Http2Exception {
760             onGoAwayRead0(ctx, lastStreamId, errorCode, debugData);
761         }
762 
763         @Override
764         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
765                 throws Http2Exception {
766             verifyPrefaceReceived();
767             internalFrameListener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
768         }
769 
770         @Override
771         public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
772                 ByteBuf payload) throws Http2Exception {
773             onUnknownFrame0(ctx, frameType, streamId, flags, payload);
774         }
775     }
776 
777     private static final class ContentLength {
778         private final long expected;
779         private long seen;
780 
781         ContentLength(long expected) {
782             this.expected = expected;
783         }
784 
785         void increaseReceivedBytes(boolean server, int streamId, int bytes, boolean isEnd) throws Http2Exception {
786             seen += bytes;
787             // Check for overflow
788             if (seen < 0) {
789                 throw streamError(streamId, PROTOCOL_ERROR,
790                         "Received amount of data did overflow and so not match content-length header %d", expected);
791             }
792             // Check if we received more data then what was advertised via the content-length header.
793             if (seen > expected) {
794                 throw streamError(streamId, PROTOCOL_ERROR,
795                         "Received amount of data %d does not match content-length header %d", seen, expected);
796             }
797 
798             if (isEnd) {
799                 if (seen == 0 && !server) {
800                     // This may be a response to a HEAD request, let's just allow it.
801                     return;
802                 }
803 
804                 // Check that we really saw what was told via the content-length header.
805                 if (expected > seen) {
806                     throw streamError(streamId, PROTOCOL_ERROR,
807                             "Received amount of data %d does not match content-length header %d", seen, expected);
808                 }
809             }
810         }
811     }
812 }