View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.channel.ChannelHandlerContext;
19  import io.netty.handler.codec.http.HttpHeaderNames;
20  import io.netty.handler.codec.http.HttpStatusClass;
21  import io.netty.handler.codec.http.HttpUtil;
22  import io.netty.handler.codec.http2.Http2Connection.Endpoint;
23  import io.netty.util.internal.logging.InternalLogger;
24  import io.netty.util.internal.logging.InternalLoggerFactory;
25  
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map.Entry;
29  
30  import static io.netty.handler.codec.http.HttpStatusClass.INFORMATIONAL;
31  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
32  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
33  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
34  import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
35  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
36  import static io.netty.handler.codec.http2.Http2Exception.streamError;
37  import static io.netty.handler.codec.http2.Http2PromisedRequestVerifier.ALWAYS_VERIFY;
38  import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
39  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
40  import static io.netty.util.internal.ObjectUtil.checkNotNull;
41  import static java.lang.Integer.MAX_VALUE;
42  import static java.lang.Math.min;
43  
44  /**
45   * Provides the default implementation for processing inbound frame events and delegates to a
46   * {@link Http2FrameListener}
47   * <p>
48   * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
49   * <p>
50   * This interface enforces inbound flow control functionality through
51   * {@link Http2LocalFlowController}
52   */
53  public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
54      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2ConnectionDecoder.class);
55      private Http2FrameListener internalFrameListener = new PrefaceFrameListener();
56      private final Http2Connection connection;
57      private Http2LifecycleManager lifecycleManager;
58      private final Http2ConnectionEncoder encoder;
59      private final Http2FrameReader frameReader;
60      private Http2FrameListener listener;
61      private final Http2PromisedRequestVerifier requestVerifier;
62      private final Http2SettingsReceivedConsumer settingsReceivedConsumer;
63      private final boolean autoAckPing;
64      private final Http2Connection.PropertyKey contentLengthKey;
65      private final boolean validateHeaders;
66  
67      public DefaultHttp2ConnectionDecoder(Http2Connection connection,
68                                           Http2ConnectionEncoder encoder,
69                                           Http2FrameReader frameReader) {
70          this(connection, encoder, frameReader, ALWAYS_VERIFY);
71      }
72  
73      public DefaultHttp2ConnectionDecoder(Http2Connection connection,
74                                           Http2ConnectionEncoder encoder,
75                                           Http2FrameReader frameReader,
76                                           Http2PromisedRequestVerifier requestVerifier) {
77          this(connection, encoder, frameReader, requestVerifier, true);
78      }
79  
80      /**
81       * Create a new instance.
82       * @param connection The {@link Http2Connection} associated with this decoder.
83       * @param encoder The {@link Http2ConnectionEncoder} associated with this decoder.
84       * @param frameReader Responsible for reading/parsing the raw frames. As opposed to this object which applies
85       *                    h2 semantics on top of the frames.
86       * @param requestVerifier Determines if push promised streams are valid.
87       * @param autoAckSettings {@code false} to disable automatically applying and sending settings acknowledge frame.
88       *  The {@code Http2ConnectionEncoder} is expected to be an instance of {@link Http2SettingsReceivedConsumer} and
89       *  will apply the earliest received but not yet ACKed SETTINGS when writing the SETTINGS ACKs.
90       * {@code true} to enable automatically applying and sending settings acknowledge frame.
91       */
92      public DefaultHttp2ConnectionDecoder(Http2Connection connection,
93                                           Http2ConnectionEncoder encoder,
94                                           Http2FrameReader frameReader,
95                                           Http2PromisedRequestVerifier requestVerifier,
96                                           boolean autoAckSettings) {
97          this(connection, encoder, frameReader, requestVerifier, autoAckSettings, true);
98      }
99  
100     @Deprecated
101     public DefaultHttp2ConnectionDecoder(Http2Connection connection,
102                                          Http2ConnectionEncoder encoder,
103                                          Http2FrameReader frameReader,
104                                          Http2PromisedRequestVerifier requestVerifier,
105                                          boolean autoAckSettings,
106                                          boolean autoAckPing) {
107         this(connection, encoder, frameReader, requestVerifier, autoAckSettings, autoAckPing, true);
108     }
109 
110     /**
111      * Create a new instance.
112      * @param connection The {@link Http2Connection} associated with this decoder.
113      * @param encoder The {@link Http2ConnectionEncoder} associated with this decoder.
114      * @param frameReader Responsible for reading/parsing the raw frames. As opposed to this object which applies
115      *                    h2 semantics on top of the frames.
116      * @param requestVerifier Determines if push promised streams are valid.
117      * @param autoAckSettings {@code false} to disable automatically applying and sending settings acknowledge frame.
118      *                        The {@code Http2ConnectionEncoder} is expected to be an instance of
119      *                        {@link Http2SettingsReceivedConsumer} and will apply the earliest received but not yet
120      *                        ACKed SETTINGS when writing the SETTINGS ACKs. {@code true} to enable automatically
121      *                        applying and sending settings acknowledge frame.
122      * @param autoAckPing {@code false} to disable automatically sending ping acknowledge frame. {@code true} to enable
123      *                    automatically sending ping ack frame.
124      */
125     public DefaultHttp2ConnectionDecoder(Http2Connection connection,
126                                          Http2ConnectionEncoder encoder,
127                                          Http2FrameReader frameReader,
128                                          Http2PromisedRequestVerifier requestVerifier,
129                                          boolean autoAckSettings,
130                                          boolean autoAckPing,
131                                          boolean validateHeaders) {
132         this.validateHeaders = validateHeaders;
133         this.autoAckPing = autoAckPing;
134         if (autoAckSettings) {
135             settingsReceivedConsumer = null;
136         } else {
137             if (!(encoder instanceof Http2SettingsReceivedConsumer)) {
138                 throw new IllegalArgumentException("disabling autoAckSettings requires the encoder to be a " +
139                         Http2SettingsReceivedConsumer.class);
140             }
141             settingsReceivedConsumer = (Http2SettingsReceivedConsumer) encoder;
142         }
143         this.connection = checkNotNull(connection, "connection");
144         contentLengthKey = this.connection.newKey();
145         this.frameReader = checkNotNull(frameReader, "frameReader");
146         this.encoder = checkNotNull(encoder, "encoder");
147         this.requestVerifier = checkNotNull(requestVerifier, "requestVerifier");
148         if (connection.local().flowController() == null) {
149             connection.local().flowController(new DefaultHttp2LocalFlowController(connection));
150         }
151         connection.local().flowController().frameWriter(encoder.frameWriter());
152     }
153 
154     @Override
155     public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
156         this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
157     }
158 
159     @Override
160     public Http2Connection connection() {
161         return connection;
162     }
163 
164     @Override
165     public final Http2LocalFlowController flowController() {
166         return connection.local().flowController();
167     }
168 
169     @Override
170     public void frameListener(Http2FrameListener listener) {
171         this.listener = checkNotNull(listener, "listener");
172     }
173 
174     @Override
175     public Http2FrameListener frameListener() {
176         return listener;
177     }
178 
179     @Override
180     public boolean prefaceReceived() {
181         return FrameReadListener.class == internalFrameListener.getClass();
182     }
183 
184     @Override
185     public void decodeFrame(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Http2Exception {
186         frameReader.readFrame(ctx, in, internalFrameListener);
187     }
188 
189     @Override
190     public Http2Settings localSettings() {
191         Http2Settings settings = new Http2Settings();
192         Http2FrameReader.Configuration config = frameReader.configuration();
193         Http2HeadersDecoder.Configuration headersConfig = config.headersConfiguration();
194         Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
195         settings.initialWindowSize(flowController().initialWindowSize());
196         settings.maxConcurrentStreams(connection.remote().maxActiveStreams());
197         settings.headerTableSize(headersConfig.maxHeaderTableSize());
198         settings.maxFrameSize(frameSizePolicy.maxFrameSize());
199         settings.maxHeaderListSize(headersConfig.maxHeaderListSize());
200         if (!connection.isServer()) {
201             // Only set the pushEnabled flag if this is a client endpoint.
202             settings.pushEnabled(connection.local().allowPushTo());
203         }
204         return settings;
205     }
206 
207     @Override
208     public void close() {
209         frameReader.close();
210     }
211 
212     /**
213      * Calculate the threshold in bytes which should trigger a {@code GO_AWAY} if a set of headers exceeds this amount.
214      * @param maxHeaderListSize
215      *      <a href="https://tools.ietf.org/html/rfc7540#section-6.5.2">SETTINGS_MAX_HEADER_LIST_SIZE</a> for the local
216      *      endpoint.
217      * @return the threshold in bytes which should trigger a {@code GO_AWAY} if a set of headers exceeds this amount.
218      */
219     protected long calculateMaxHeaderListSizeGoAway(long maxHeaderListSize) {
220         return Http2CodecUtil.calculateMaxHeaderListSizeGoAway(maxHeaderListSize);
221     }
222 
223     private int unconsumedBytes(Http2Stream stream) {
224         return flowController().unconsumedBytes(stream);
225     }
226 
227     void onGoAwayRead0(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
228             throws Http2Exception {
229         listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData);
230         connection.goAwayReceived(lastStreamId, errorCode, debugData);
231     }
232 
233     void onUnknownFrame0(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
234             ByteBuf payload) throws Http2Exception {
235         listener.onUnknownFrame(ctx, frameType, streamId, flags, payload);
236     }
237 
238     // See https://tools.ietf.org/html/rfc7540#section-8.1.2.6
239     private void verifyContentLength(Http2Stream stream, int data, boolean isEnd) throws Http2Exception {
240         ContentLength contentLength = stream.getProperty(contentLengthKey);
241         if (contentLength != null) {
242             try {
243                 contentLength.increaseReceivedBytes(connection.isServer(), stream.id(), data, isEnd);
244             } finally {
245                 if (isEnd) {
246                     stream.removeProperty(contentLengthKey);
247                 }
248             }
249         }
250     }
251 
252     /**
253      * Handles all inbound frames from the network.
254      */
255     private final class FrameReadListener implements Http2FrameListener {
256         @Override
257         public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
258                               boolean endOfStream) throws Http2Exception {
259             Http2Stream stream = connection.stream(streamId);
260             Http2LocalFlowController flowController = flowController();
261             int readable = data.readableBytes();
262             int bytesToReturn = readable + padding;
263 
264             final boolean shouldIgnore;
265             try {
266                 shouldIgnore = shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, endOfStream, "DATA");
267             } catch (Http2Exception e) {
268                 // Ignoring this frame. We still need to count the frame towards the connection flow control
269                 // window, but we immediately mark all bytes as consumed.
270                 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
271                 flowController.consumeBytes(stream, bytesToReturn);
272                 throw e;
273             } catch (Throwable t) {
274                 throw connectionError(INTERNAL_ERROR, t, "Unhandled error on data stream id %d", streamId);
275             }
276 
277             if (shouldIgnore) {
278                 // Ignoring this frame. We still need to count the frame towards the connection flow control
279                 // window, but we immediately mark all bytes as consumed.
280                 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
281                 flowController.consumeBytes(stream, bytesToReturn);
282 
283                 // Verify that the stream may have existed after we apply flow control.
284                 verifyStreamMayHaveExisted(streamId, endOfStream, "DATA");
285 
286                 // All bytes have been consumed.
287                 return bytesToReturn;
288             }
289             Http2Exception error = null;
290             switch (stream.state()) {
291                 case OPEN:
292                 case HALF_CLOSED_LOCAL:
293                     break;
294                 case HALF_CLOSED_REMOTE:
295                 case CLOSED:
296                     error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
297                         stream.id(), stream.state());
298                     break;
299                 default:
300                     error = streamError(stream.id(), PROTOCOL_ERROR,
301                         "Stream %d in unexpected state: %s", stream.id(), stream.state());
302                     break;
303             }
304 
305             int unconsumedBytes = unconsumedBytes(stream);
306             try {
307                 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
308                 // Update the unconsumed bytes after flow control is applied.
309                 unconsumedBytes = unconsumedBytes(stream);
310 
311                 // If the stream is in an invalid state to receive the frame, throw the error.
312                 if (error != null) {
313                     throw error;
314                 }
315 
316                 verifyContentLength(stream, readable, endOfStream);
317 
318                 // Call back the application and retrieve the number of bytes that have been
319                 // immediately processed.
320                 bytesToReturn = listener.onDataRead(ctx, streamId, data, padding, endOfStream);
321 
322                 if (endOfStream) {
323                     lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
324                 }
325 
326                 return bytesToReturn;
327             } catch (Http2Exception | RuntimeException e) {
328                 // If an exception happened during delivery, the listener may have returned part
329                 // of the bytes before the error occurred. If that's the case, subtract that from
330                 // the total processed bytes so that we don't return too many bytes.
331                 int delta = unconsumedBytes - unconsumedBytes(stream);
332                 bytesToReturn -= delta;
333                 throw e;
334             } finally {
335                 // If appropriate, return the processed bytes to the flow controller.
336                 flowController.consumeBytes(stream, bytesToReturn);
337             }
338         }
339 
340         @Override
341         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
342                 boolean endOfStream) throws Http2Exception {
343             onHeadersRead(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endOfStream);
344         }
345 
346         @Override
347         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
348                 short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
349             Http2Stream stream = connection.stream(streamId);
350             boolean allowHalfClosedRemote = false;
351             boolean isTrailers = false;
352             if (stream == null && !connection.streamMayHaveExisted(streamId)) {
353                 stream = connection.remote().createStream(streamId, endOfStream);
354                 // Allow the state to be HALF_CLOSE_REMOTE if we're creating it in that state.
355                 allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE;
356             } else if (stream != null) {
357                 isTrailers = stream.isHeadersReceived();
358             }
359 
360             if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, endOfStream, "HEADERS")) {
361                 return;
362             }
363 
364             boolean isInformational = !connection.isServer() &&
365                     HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;
366             if ((isInformational || !endOfStream) && stream.isHeadersReceived() || stream.isTrailersReceived()) {
367                 throw streamError(streamId, PROTOCOL_ERROR,
368                                   "Stream %d received too many headers EOS: %s state: %s",
369                                   streamId, endOfStream, stream.state());
370             }
371 
372             switch (stream.state()) {
373                 case RESERVED_REMOTE:
374                     stream.open(endOfStream);
375                     break;
376                 case OPEN:
377                 case HALF_CLOSED_LOCAL:
378                     // Allowed to receive headers in these states.
379                     break;
380                 case HALF_CLOSED_REMOTE:
381                     if (!allowHalfClosedRemote) {
382                         throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
383                                 stream.id(), stream.state());
384                     }
385                     break;
386                 case CLOSED:
387                     throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
388                             stream.id(), stream.state());
389                 default:
390                     // Connection error.
391                     throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(),
392                             stream.state());
393             }
394 
395             if (!isTrailers) {
396                 // extract the content-length header
397                 List<? extends CharSequence> contentLength = headers.getAll(HttpHeaderNames.CONTENT_LENGTH);
398                 if (contentLength != null && !contentLength.isEmpty()) {
399                     try {
400                         long cLength = HttpUtil.normalizeAndGetContentLength(contentLength, false, true);
401                         if (cLength != -1) {
402                             headers.setLong(HttpHeaderNames.CONTENT_LENGTH, cLength);
403                             stream.setProperty(contentLengthKey, new ContentLength(cLength));
404                         }
405                     } catch (IllegalArgumentException e) {
406                         throw streamError(stream.id(), PROTOCOL_ERROR, e,
407                                 "Multiple content-length headers received");
408                     }
409                 }
410                 // Use size() instead of isEmpty() for backward compatibility with grpc-java prior to 1.59.1,
411                 // see https://github.com/grpc/grpc-java/issues/10665
412             } else if (validateHeaders && headers.size() > 0) {
413                 // Need to check trailers don't contain pseudo headers. According to RFC 9113
414                 // Trailers MUST NOT include pseudo-header fields (Section 8.3).
415                 for (Iterator<Entry<CharSequence, CharSequence>> iterator =
416                     headers.iterator(); iterator.hasNext();) {
417                     CharSequence name = iterator.next().getKey();
418                     if (Http2Headers.PseudoHeaderName.hasPseudoHeaderFormat(name)) {
419                         throw streamError(stream.id(), PROTOCOL_ERROR,
420                                 "Found invalid Pseudo-Header in trailers: %s", name);
421                     }
422                 }
423             }
424 
425             stream.headersReceived(isInformational);
426             verifyContentLength(stream, 0, endOfStream);
427             encoder.flowController().updateDependencyTree(streamId, streamDependency, weight, exclusive);
428             listener.onHeadersRead(ctx, streamId, headers, streamDependency,
429                     weight, exclusive, padding, endOfStream);
430             // If the headers completes this stream, close it.
431             if (endOfStream) {
432                 lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
433             }
434         }
435 
436         @Override
437         public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
438                 boolean exclusive) throws Http2Exception {
439             encoder.flowController().updateDependencyTree(streamId, streamDependency, weight, exclusive);
440 
441             listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
442         }
443 
444         @Override
445         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
446             Http2Stream stream = connection.stream(streamId);
447             if (stream == null) {
448                 verifyStreamMayHaveExisted(streamId, false, "RST_STREAM");
449                 return;
450             }
451 
452             switch(stream.state()) {
453             case IDLE:
454                 throw connectionError(PROTOCOL_ERROR, "RST_STREAM received for IDLE stream %d", streamId);
455             case CLOSED:
456                 return; // RST_STREAM frames must be ignored for closed streams.
457             default:
458                 break;
459             }
460 
461             listener.onRstStreamRead(ctx, streamId, errorCode);
462 
463             lifecycleManager.closeStream(stream, ctx.newSucceededFuture());
464         }
465 
466         @Override
467         public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
468             // Apply oldest outstanding local settings here. This is a synchronization point between endpoints.
469             Http2Settings settings = encoder.pollSentSettings();
470 
471             if (settings != null) {
472                 applyLocalSettings(settings);
473             }
474 
475             listener.onSettingsAckRead(ctx);
476         }
477 
478         /**
479          * Applies settings sent from the local endpoint.
480          * <p>
481          * This method is only called after the local settings have been acknowledged from the remote endpoint.
482          */
483         private void applyLocalSettings(Http2Settings settings) throws Http2Exception {
484             Boolean pushEnabled = settings.pushEnabled();
485             final Http2FrameReader.Configuration config = frameReader.configuration();
486             final Http2HeadersDecoder.Configuration headerConfig = config.headersConfiguration();
487             final Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
488             if (pushEnabled != null) {
489                 if (connection.isServer()) {
490                     throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
491                 }
492                 connection.local().allowPushTo(pushEnabled);
493             }
494 
495             Long maxConcurrentStreams = settings.maxConcurrentStreams();
496             if (maxConcurrentStreams != null) {
497                 connection.remote().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
498             }
499 
500             Long headerTableSize = settings.headerTableSize();
501             if (headerTableSize != null) {
502                 headerConfig.maxHeaderTableSize(headerTableSize);
503             }
504 
505             Long maxHeaderListSize = settings.maxHeaderListSize();
506             if (maxHeaderListSize != null) {
507                 headerConfig.maxHeaderListSize(maxHeaderListSize, calculateMaxHeaderListSizeGoAway(maxHeaderListSize));
508             }
509 
510             Integer maxFrameSize = settings.maxFrameSize();
511             if (maxFrameSize != null) {
512                 frameSizePolicy.maxFrameSize(maxFrameSize);
513             }
514 
515             Integer initialWindowSize = settings.initialWindowSize();
516             if (initialWindowSize != null) {
517                 flowController().initialWindowSize(initialWindowSize);
518             }
519         }
520 
521         @Override
522         public void onSettingsRead(final ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
523             if (settingsReceivedConsumer == null) {
524                 // Acknowledge receipt of the settings. We should do this before we process the settings to ensure our
525                 // remote peer applies these settings before any subsequent frames that we may send which depend upon
526                 // these new settings. See https://github.com/netty/netty/issues/6520.
527                 encoder.writeSettingsAck(ctx, ctx.newPromise());
528 
529                 encoder.remoteSettings(settings);
530             } else {
531                 settingsReceivedConsumer.consumeReceivedSettings(settings);
532             }
533 
534             listener.onSettingsRead(ctx, settings);
535         }
536 
537         @Override
538         public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
539             if (autoAckPing) {
540                 // Send an ack back to the remote client.
541                 encoder.writePing(ctx, true, data, ctx.newPromise());
542             }
543             listener.onPingRead(ctx, data);
544         }
545 
546         @Override
547         public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
548             listener.onPingAckRead(ctx, data);
549         }
550 
551         @Override
552         public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
553                 Http2Headers headers, int padding) throws Http2Exception {
554             // A client cannot push.
555             if (connection().isServer()) {
556                 throw connectionError(PROTOCOL_ERROR, "A client cannot push.");
557             }
558 
559             Http2Stream parentStream = connection.stream(streamId);
560 
561             if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, parentStream, false, "PUSH_PROMISE")) {
562                 return;
563             }
564 
565             switch (parentStream.state()) {
566               case OPEN:
567               case HALF_CLOSED_LOCAL:
568                   // Allowed to receive push promise in these states.
569                   break;
570               default:
571                   // Connection error.
572                   throw connectionError(PROTOCOL_ERROR,
573                       "Stream %d in unexpected state for receiving push promise: %s",
574                       parentStream.id(), parentStream.state());
575             }
576 
577             if (!requestVerifier.isAuthoritative(ctx, headers)) {
578                 throw streamError(promisedStreamId, PROTOCOL_ERROR,
579                         "Promised request on stream %d for promised stream %d is not authoritative",
580                         streamId, promisedStreamId);
581             }
582             if (!requestVerifier.isCacheable(headers)) {
583                 throw streamError(promisedStreamId, PROTOCOL_ERROR,
584                         "Promised request on stream %d for promised stream %d is not known to be cacheable",
585                         streamId, promisedStreamId);
586             }
587             if (!requestVerifier.isSafe(headers)) {
588                 throw streamError(promisedStreamId, PROTOCOL_ERROR,
589                         "Promised request on stream %d for promised stream %d is not known to be safe",
590                         streamId, promisedStreamId);
591             }
592 
593             // Reserve the push stream based with a priority based on the current stream's priority.
594             connection.remote().reservePushStream(promisedStreamId, parentStream);
595 
596             listener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
597         }
598 
599         @Override
600         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
601                 throws Http2Exception {
602             onGoAwayRead0(ctx, lastStreamId, errorCode, debugData);
603         }
604 
605         @Override
606         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
607                 throws Http2Exception {
608             Http2Stream stream = connection.stream(streamId);
609             if (stream == null || stream.state() == CLOSED || streamCreatedAfterGoAwaySent(streamId)) {
610                 // Ignore this frame.
611                 verifyStreamMayHaveExisted(streamId, false, "WINDOW_UPDATE");
612                 return;
613             }
614 
615             // Update the outbound flow control window.
616             encoder.flowController().incrementWindowSize(stream, windowSizeIncrement);
617 
618             listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
619         }
620 
621         @Override
622         public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
623                 ByteBuf payload) throws Http2Exception {
624             onUnknownFrame0(ctx, frameType, streamId, flags, payload);
625         }
626 
627         /**
628          * Helper method to determine if a frame that has the semantics of headers or data should be ignored for the
629          * {@code stream} (which may be {@code null}) associated with {@code streamId}.
630          */
631         private boolean shouldIgnoreHeadersOrDataFrame(ChannelHandlerContext ctx, int streamId, Http2Stream stream,
632                 boolean endOfStream, String frameName) throws Http2Exception {
633             if (stream == null) {
634                 if (streamCreatedAfterGoAwaySent(streamId)) {
635                     logger.info("{} ignoring {} frame for stream {}. Stream sent after GOAWAY sent",
636                             ctx.channel(), frameName, streamId);
637                     return true;
638                 }
639 
640                 // Make sure it's not an out-of-order frame, like a rogue DATA frame, for a stream that could
641                 // never have existed.
642                 verifyStreamMayHaveExisted(streamId, endOfStream, frameName);
643 
644                 // Its possible that this frame would result in stream ID out of order creation (PROTOCOL ERROR) and its
645                 // also possible that this frame is received on a CLOSED stream (STREAM_CLOSED after a RST_STREAM is
646                 // sent). We don't have enough information to know for sure, so we choose the lesser of the two errors.
647                 throw streamError(streamId, STREAM_CLOSED, "Received %s frame for an unknown stream %d",
648                                   frameName, streamId);
649             }
650             if (stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) {
651                 // If we have sent a reset stream it is assumed the stream will be closed after the write completes.
652                 // If we have not sent a reset, but the stream was created after a GoAway this is not supported by
653                 // DefaultHttp2Connection and if a custom Http2Connection is used it is assumed the lifetime is managed
654                 // elsewhere so we don't close the stream or otherwise modify the stream's state.
655 
656                 if (logger.isInfoEnabled()) {
657                     logger.info("{} ignoring {} frame for stream {}", ctx.channel(), frameName,
658                             stream.isResetSent() ? "RST_STREAM sent." :
659                                     "Stream created after GOAWAY sent. Last known stream by peer " +
660                                      connection.remote().lastStreamKnownByPeer());
661                 }
662 
663                 return true;
664             }
665             return false;
666         }
667 
668         /**
669          * Helper method for determining whether or not to ignore inbound frames. A stream is considered to be created
670          * after a {@code GOAWAY} is sent if the following conditions hold:
671          * <p/>
672          * <ul>
673          *     <li>A {@code GOAWAY} must have been sent by the local endpoint</li>
674          *     <li>The {@code streamId} must identify a legitimate stream id for the remote endpoint to be creating</li>
675          *     <li>{@code streamId} is greater than the Last Known Stream ID which was sent by the local endpoint
676          *     in the last {@code GOAWAY} frame</li>
677          * </ul>
678          * <p/>
679          */
680         private boolean streamCreatedAfterGoAwaySent(int streamId) {
681             Endpoint<?> remote = connection.remote();
682             return connection.goAwaySent() && remote.isValidStreamId(streamId) &&
683                     streamId > remote.lastStreamKnownByPeer();
684         }
685 
686         private void verifyStreamMayHaveExisted(int streamId, boolean endOfStream, String frameName)
687                 throws Http2Exception {
688             if (!connection.streamMayHaveExisted(streamId)) {
689                 throw connectionError(PROTOCOL_ERROR,
690                         "Stream %d does not exist for inbound frame %s, endOfStream = %b",
691                         streamId, frameName, endOfStream);
692             }
693         }
694     }
695 
696     private final class PrefaceFrameListener implements Http2FrameListener {
697         /**
698          * Verifies that the HTTP/2 connection preface has been received from the remote endpoint.
699          * It is possible that the current call to
700          * {@link Http2FrameReader#readFrame(ChannelHandlerContext, ByteBuf, Http2FrameListener)} will have multiple
701          * frames to dispatch. So it may be OK for this class to get legitimate frames for the first readFrame.
702          */
703         private void verifyPrefaceReceived() throws Http2Exception {
704             if (!prefaceReceived()) {
705                 throw connectionError(PROTOCOL_ERROR, "Received non-SETTINGS as first frame.");
706             }
707         }
708 
709         @Override
710         public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
711                 throws Http2Exception {
712             verifyPrefaceReceived();
713             return internalFrameListener.onDataRead(ctx, streamId, data, padding, endOfStream);
714         }
715 
716         @Override
717         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
718                 boolean endOfStream) throws Http2Exception {
719             verifyPrefaceReceived();
720             internalFrameListener.onHeadersRead(ctx, streamId, headers, padding, endOfStream);
721         }
722 
723         @Override
724         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
725                 short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
726             verifyPrefaceReceived();
727             internalFrameListener.onHeadersRead(ctx, streamId, headers, streamDependency, weight,
728                     exclusive, padding, endOfStream);
729         }
730 
731         @Override
732         public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
733                 boolean exclusive) throws Http2Exception {
734             verifyPrefaceReceived();
735             internalFrameListener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
736         }
737 
738         @Override
739         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
740             verifyPrefaceReceived();
741             internalFrameListener.onRstStreamRead(ctx, streamId, errorCode);
742         }
743 
744         @Override
745         public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
746             verifyPrefaceReceived();
747             internalFrameListener.onSettingsAckRead(ctx);
748         }
749 
750         @Override
751         public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
752             // The first settings should change the internalFrameListener to the "real" listener
753             // that expects the preface to be verified.
754             if (!prefaceReceived()) {
755                 internalFrameListener = new FrameReadListener();
756             }
757             internalFrameListener.onSettingsRead(ctx, settings);
758         }
759 
760         @Override
761         public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
762             verifyPrefaceReceived();
763             internalFrameListener.onPingRead(ctx, data);
764         }
765 
766         @Override
767         public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
768             verifyPrefaceReceived();
769             internalFrameListener.onPingAckRead(ctx, data);
770         }
771 
772         @Override
773         public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
774                 Http2Headers headers, int padding) throws Http2Exception {
775             verifyPrefaceReceived();
776             internalFrameListener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
777         }
778 
779         @Override
780         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
781                 throws Http2Exception {
782             onGoAwayRead0(ctx, lastStreamId, errorCode, debugData);
783         }
784 
785         @Override
786         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
787                 throws Http2Exception {
788             verifyPrefaceReceived();
789             internalFrameListener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
790         }
791 
792         @Override
793         public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
794                 ByteBuf payload) throws Http2Exception {
795             onUnknownFrame0(ctx, frameType, streamId, flags, payload);
796         }
797     }
798 
799     private static final class ContentLength {
800         private final long expected;
801         private long seen;
802 
803         ContentLength(long expected) {
804             this.expected = expected;
805         }
806 
807         void increaseReceivedBytes(boolean server, int streamId, int bytes, boolean isEnd) throws Http2Exception {
808             seen += bytes;
809             // Check for overflow
810             if (seen < 0) {
811                 throw streamError(streamId, PROTOCOL_ERROR,
812                         "Received amount of data did overflow and so not match content-length header %d", expected);
813             }
814             // Check if we received more data then what was advertised via the content-length header.
815             if (seen > expected) {
816                 throw streamError(streamId, PROTOCOL_ERROR,
817                         "Received amount of data %d does not match content-length header %d", seen, expected);
818             }
819 
820             if (isEnd) {
821                 if (seen == 0 && !server) {
822                     // This may be a response to a HEAD request, let's just allow it.
823                     return;
824                 }
825 
826                 // Check that we really saw what was told via the content-length header.
827                 if (expected > seen) {
828                     throw streamError(streamId, PROTOCOL_ERROR,
829                             "Received amount of data %d does not match content-length header %d", seen, expected);
830                 }
831             }
832         }
833     }
834 }