View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty5.handler.codec.http2;
16  
17  import io.netty5.buffer.api.Buffer;
18  import io.netty5.channel.ChannelHandlerContext;
19  import io.netty5.handler.codec.http.HttpHeaderNames;
20  import io.netty5.handler.codec.http.HttpStatusClass;
21  import io.netty5.handler.codec.http.HttpUtil;
22  import io.netty5.handler.codec.http2.Http2Connection.Endpoint;
23  import io.netty5.util.internal.UnstableApi;
24  import io.netty5.util.internal.logging.InternalLogger;
25  import io.netty5.util.internal.logging.InternalLoggerFactory;
26  
27  import java.util.List;
28  
29  import static io.netty5.handler.codec.http.HttpStatusClass.INFORMATIONAL;
30  import static io.netty5.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
31  import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
32  import static io.netty5.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
33  import static io.netty5.handler.codec.http2.Http2Error.STREAM_CLOSED;
34  import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
35  import static io.netty5.handler.codec.http2.Http2Exception.streamError;
36  import static io.netty5.handler.codec.http2.Http2PromisedRequestVerifier.ALWAYS_VERIFY;
37  import static io.netty5.handler.codec.http2.Http2Stream.State.CLOSED;
38  import static io.netty5.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
39  import static java.lang.Integer.MAX_VALUE;
40  import static java.lang.Math.min;
41  import static java.util.Objects.requireNonNull;
42  
43  /**
44   * Provides the default implementation for processing inbound frame events and delegates to a
45   * {@link Http2FrameListener}
46   * <p>
47   * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
48   * <p>
49   * This interface enforces inbound flow control functionality through
50   * {@link Http2LocalFlowController}
51   */
52  @UnstableApi
53  public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
54      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2ConnectionDecoder.class);
55      private Http2FrameListener internalFrameListener = new PrefaceFrameListener();
56      private final Http2Connection connection;
57      private Http2LifecycleManager lifecycleManager;
58      private final Http2ConnectionEncoder encoder;
59      private final Http2FrameReader frameReader;
60      private Http2FrameListener listener;
61      private final Http2PromisedRequestVerifier requestVerifier;
62      private final Http2SettingsReceivedConsumer settingsReceivedConsumer;
63      private final boolean autoAckPing;
64      private final Http2Connection.PropertyKey contentLengthKey;
65  
66      public DefaultHttp2ConnectionDecoder(Http2Connection connection,
67                                           Http2ConnectionEncoder encoder,
68                                           Http2FrameReader frameReader) {
69          this(connection, encoder, frameReader, ALWAYS_VERIFY);
70      }
71  
72      public DefaultHttp2ConnectionDecoder(Http2Connection connection,
73                                           Http2ConnectionEncoder encoder,
74                                           Http2FrameReader frameReader,
75                                           Http2PromisedRequestVerifier requestVerifier) {
76          this(connection, encoder, frameReader, requestVerifier, true);
77      }
78  
79      /**
80       * Create a new instance.
81       * @param connection The {@link Http2Connection} associated with this decoder.
82       * @param encoder The {@link Http2ConnectionEncoder} associated with this decoder.
83       * @param frameReader Responsible for reading/parsing the raw frames. As opposed to this object which applies
84       *                    h2 semantics on top of the frames.
85       * @param requestVerifier Determines if push promised streams are valid.
86       * @param autoAckSettings {@code false} to disable automatically applying and sending settings acknowledge frame.
87       *  The {@code Http2ConnectionEncoder} is expected to be an instance of {@link Http2SettingsReceivedConsumer} and
88       *  will apply the earliest received but not yet ACKed SETTINGS when writing the SETTINGS ACKs.
89       * {@code true} to enable automatically applying and sending settings acknowledge frame.
90       */
91      public DefaultHttp2ConnectionDecoder(Http2Connection connection,
92                                           Http2ConnectionEncoder encoder,
93                                           Http2FrameReader frameReader,
94                                           Http2PromisedRequestVerifier requestVerifier,
95                                           boolean autoAckSettings) {
96          this(connection, encoder, frameReader, requestVerifier, autoAckSettings, true);
97      }
98  
99      /**
100      * Create a new instance.
101      * @param connection The {@link Http2Connection} associated with this decoder.
102      * @param encoder The {@link Http2ConnectionEncoder} associated with this decoder.
103      * @param frameReader Responsible for reading/parsing the raw frames. As opposed to this object which applies
104      *                    h2 semantics on top of the frames.
105      * @param requestVerifier Determines if push promised streams are valid.
106      * @param autoAckSettings {@code false} to disable automatically applying and sending settings acknowledge frame.
107      *                        The {@code Http2ConnectionEncoder} is expected to be an instance of
108      *                        {@link Http2SettingsReceivedConsumer} and will apply the earliest received but not yet
109      *                        ACKed SETTINGS when writing the SETTINGS ACKs. {@code true} to enable automatically
110      *                        applying and sending settings acknowledge frame.
111      * @param autoAckPing {@code false} to disable automatically sending ping acknowledge frame. {@code true} to enable
112      *                    automatically sending ping ack frame.
113      */
114     public DefaultHttp2ConnectionDecoder(Http2Connection connection,
115                                          Http2ConnectionEncoder encoder,
116                                          Http2FrameReader frameReader,
117                                          Http2PromisedRequestVerifier requestVerifier,
118                                          boolean autoAckSettings,
119                                          boolean autoAckPing) {
120         this.autoAckPing = autoAckPing;
121         if (autoAckSettings) {
122             settingsReceivedConsumer = null;
123         } else {
124             if (!(encoder instanceof Http2SettingsReceivedConsumer)) {
125                 throw new IllegalArgumentException("disabling autoAckSettings requires the encoder to be a " +
126                         Http2SettingsReceivedConsumer.class);
127             }
128             settingsReceivedConsumer = (Http2SettingsReceivedConsumer) encoder;
129         }
130         this.connection = requireNonNull(connection, "connection");
131         contentLengthKey = this.connection.newKey();
132         this.frameReader = requireNonNull(frameReader, "frameReader");
133         this.encoder = requireNonNull(encoder, "encoder");
134         this.requestVerifier = requireNonNull(requestVerifier, "requestVerifier");
135         if (connection.local().flowController() == null) {
136             connection.local().flowController(new DefaultHttp2LocalFlowController(connection));
137         }
138         connection.local().flowController().frameWriter(encoder.frameWriter());
139     }
140 
141     @Override
142     public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
143         this.lifecycleManager = requireNonNull(lifecycleManager, "lifecycleManager");
144     }
145 
146     @Override
147     public Http2Connection connection() {
148         return connection;
149     }
150 
151     @Override
152     public final Http2LocalFlowController flowController() {
153         return connection.local().flowController();
154     }
155 
156     @Override
157     public void frameListener(Http2FrameListener listener) {
158         this.listener = requireNonNull(listener, "listener");
159     }
160 
161     @Override
162     public Http2FrameListener frameListener() {
163         return listener;
164     }
165 
166     @Override
167     public boolean prefaceReceived() {
168         return FrameReadListener.class == internalFrameListener.getClass();
169     }
170 
171     @Override
172     public void decodeFrame(ChannelHandlerContext ctx, Buffer in) throws Http2Exception {
173         frameReader.readFrame(ctx, in, internalFrameListener);
174     }
175 
176     @Override
177     public Http2Settings localSettings() {
178         Http2Settings settings = new Http2Settings();
179         Http2FrameReader.Configuration config = frameReader.configuration();
180         Http2HeadersDecoder.Configuration headersConfig = config.headersConfiguration();
181         Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
182         settings.initialWindowSize(flowController().initialWindowSize());
183         settings.maxConcurrentStreams(connection.remote().maxActiveStreams());
184         settings.headerTableSize(headersConfig.maxHeaderTableSize());
185         settings.maxFrameSize(frameSizePolicy.maxFrameSize());
186         settings.maxHeaderListSize(headersConfig.maxHeaderListSize());
187         if (!connection.isServer()) {
188             // Only set the pushEnabled flag if this is a client endpoint.
189             settings.pushEnabled(connection.local().allowPushTo());
190         }
191         return settings;
192     }
193 
194     @Override
195     public void close() {
196         frameReader.close();
197     }
198 
199     /**
200      * Calculate the threshold in bytes which should trigger a {@code GO_AWAY} if a set of headers exceeds this amount.
201      * @param maxHeaderListSize
202      *      <a href="https://tools.ietf.org/html/rfc7540#section-6.5.2">SETTINGS_MAX_HEADER_LIST_SIZE</a> for the local
203      *      endpoint.
204      * @return the threshold in bytes which should trigger a {@code GO_AWAY} if a set of headers exceeds this amount.
205      */
206     protected long calculateMaxHeaderListSizeGoAway(long maxHeaderListSize) {
207         return Http2CodecUtil.calculateMaxHeaderListSizeGoAway(maxHeaderListSize);
208     }
209 
210     private int unconsumedBytes(Http2Stream stream) {
211         return flowController().unconsumedBytes(stream);
212     }
213 
214     void onGoAwayRead0(ChannelHandlerContext ctx, int lastStreamId, long errorCode, Buffer debugData)
215             throws Http2Exception {
216         listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData.copy(true));
217         connection.goAwayReceived(lastStreamId, errorCode, debugData);
218     }
219 
220     void onUnknownFrame0(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, Buffer payload)
221             throws Http2Exception {
222         listener.onUnknownFrame(ctx, frameType, streamId, flags, payload);
223     }
224 
225     // See https://tools.ietf.org/html/rfc7540#section-8.1.2.6
226     private void verifyContentLength(Http2Stream stream, int data, boolean isEnd) throws Http2Exception {
227         ContentLength contentLength = stream.getProperty(contentLengthKey);
228         if (contentLength != null) {
229             try {
230                 contentLength.increaseReceivedBytes(connection.isServer(), stream.id(), data, isEnd);
231             } finally {
232                 if (isEnd) {
233                     stream.removeProperty(contentLengthKey);
234                 }
235             }
236         }
237     }
238 
239     /**
240      * Handles all inbound frames from the network.
241      */
242     private final class FrameReadListener implements Http2FrameListener {
243         @Override
244         public int onDataRead(final ChannelHandlerContext ctx, int streamId, Buffer data, int padding,
245                               boolean endOfStream) throws Http2Exception {
246             Http2Stream stream = connection.stream(streamId);
247             Http2LocalFlowController flowController = flowController();
248             int readable = data.readableBytes();
249             int bytesToReturn = readable + padding;
250 
251             final boolean shouldIgnore;
252             try {
253                 shouldIgnore = shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, "DATA");
254             } catch (Http2Exception e) {
255                 // Ignoring this frame. We still need to count the frame towards the connection flow control
256                 // window, but we immediately mark all bytes as consumed.
257                 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
258                 flowController.consumeBytes(stream, bytesToReturn);
259                 throw e;
260             } catch (Throwable t) {
261                 throw connectionError(INTERNAL_ERROR, t, "Unhandled error on data stream id %d", streamId);
262             }
263 
264             if (shouldIgnore) {
265                 // Ignoring this frame. We still need to count the frame towards the connection flow control
266                 // window, but we immediately mark all bytes as consumed.
267                 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
268                 flowController.consumeBytes(stream, bytesToReturn);
269 
270                 // Verify that the stream may have existed after we apply flow control.
271                 verifyStreamMayHaveExisted(streamId);
272 
273                 // All bytes have been consumed.
274                 return bytesToReturn;
275             }
276             Http2Exception error = null;
277             switch (stream.state()) {
278                 case OPEN:
279                 case HALF_CLOSED_LOCAL:
280                     break;
281                 case HALF_CLOSED_REMOTE:
282                 case CLOSED:
283                     error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
284                         stream.id(), stream.state());
285                     break;
286                 default:
287                     error = streamError(stream.id(), PROTOCOL_ERROR,
288                         "Stream %d in unexpected state: %s", stream.id(), stream.state());
289                     break;
290             }
291 
292             int unconsumedBytes = unconsumedBytes(stream);
293             try {
294                 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
295                 // Update the unconsumed bytes after flow control is applied.
296                 unconsumedBytes = unconsumedBytes(stream);
297 
298                 // If the stream is in an invalid state to receive the frame, throw the error.
299                 if (error != null) {
300                     throw error;
301                 }
302 
303                 verifyContentLength(stream, readable, endOfStream);
304 
305                 // Call back the application and retrieve the number of bytes that have been
306                 // immediately processed.
307                 bytesToReturn = listener.onDataRead(ctx, streamId, data, padding, endOfStream);
308 
309                 if (endOfStream) {
310                     lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
311                 }
312 
313                 return bytesToReturn;
314             } catch (Http2Exception | RuntimeException e) {
315                 // If an exception happened during delivery, the listener may have returned part
316                 // of the bytes before the error occurred. If that's the case, subtract that from
317                 // the total processed bytes so that we don't return too many bytes.
318                 int delta = unconsumedBytes - unconsumedBytes(stream);
319                 bytesToReturn -= delta;
320                 throw e;
321             } finally {
322                 // If appropriate, return the processed bytes to the flow controller.
323                 flowController.consumeBytes(stream, bytesToReturn);
324             }
325         }
326 
327         @Override
328         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
329                 boolean endOfStream) throws Http2Exception {
330             onHeadersRead(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endOfStream);
331         }
332 
333         @Override
334         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
335                 short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
336             Http2Stream stream = connection.stream(streamId);
337             boolean allowHalfClosedRemote = false;
338             boolean isTrailers = false;
339             if (stream == null && !connection.streamMayHaveExisted(streamId)) {
340                 stream = connection.remote().createStream(streamId, endOfStream);
341                 // Allow the state to be HALF_CLOSE_REMOTE if we're creating it in that state.
342                 allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE;
343             } else if (stream != null) {
344                 isTrailers = stream.isHeadersReceived();
345             }
346 
347             if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, "HEADERS")) {
348                 return;
349             }
350 
351             boolean isInformational = !connection.isServer() &&
352                     HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;
353             if ((isInformational || !endOfStream) && stream.isHeadersReceived() || stream.isTrailersReceived()) {
354                 throw streamError(streamId, PROTOCOL_ERROR,
355                                   "Stream %d received too many headers EOS: %s state: %s",
356                                   streamId, endOfStream, stream.state());
357             }
358 
359             switch (stream.state()) {
360                 case RESERVED_REMOTE:
361                     stream.open(endOfStream);
362                     break;
363                 case OPEN:
364                 case HALF_CLOSED_LOCAL:
365                     // Allowed to receive headers in these states.
366                     break;
367                 case HALF_CLOSED_REMOTE:
368                     if (!allowHalfClosedRemote) {
369                         throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
370                                 stream.id(), stream.state());
371                     }
372                     break;
373                 case CLOSED:
374                     throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
375                             stream.id(), stream.state());
376                 default:
377                     // Connection error.
378                     throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(),
379                             stream.state());
380             }
381 
382             if (!isTrailers) {
383                 // extract the content-length header
384                 List<? extends CharSequence> contentLength = headers.getAll(HttpHeaderNames.CONTENT_LENGTH);
385                 if (contentLength != null && !contentLength.isEmpty()) {
386                     try {
387                         long cLength = HttpUtil.normalizeAndGetContentLength(contentLength, false, true);
388                         if (cLength != -1) {
389                             headers.setLong(HttpHeaderNames.CONTENT_LENGTH, cLength);
390                             stream.setProperty(contentLengthKey, new ContentLength(cLength));
391                         }
392                     } catch (IllegalArgumentException e) {
393                         throw streamError(stream.id(), PROTOCOL_ERROR, e,
394                                 "Multiple content-length headers received");
395                     }
396                 }
397             }
398 
399             stream.headersReceived(isInformational);
400             verifyContentLength(stream, 0, endOfStream);
401             encoder.flowController().updateDependencyTree(streamId, streamDependency, weight, exclusive);
402             listener.onHeadersRead(ctx, streamId, headers, streamDependency,
403                     weight, exclusive, padding, endOfStream);
404             // If the headers completes this stream, close it.
405             if (endOfStream) {
406                 lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
407             }
408         }
409 
410         @Override
411         public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
412                 boolean exclusive) throws Http2Exception {
413             encoder.flowController().updateDependencyTree(streamId, streamDependency, weight, exclusive);
414 
415             listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
416         }
417 
418         @Override
419         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
420             Http2Stream stream = connection.stream(streamId);
421             if (stream == null) {
422                 verifyStreamMayHaveExisted(streamId);
423                 return;
424             }
425 
426             switch(stream.state()) {
427             case IDLE:
428                 throw connectionError(PROTOCOL_ERROR, "RST_STREAM received for IDLE stream %d", streamId);
429             case CLOSED:
430                 return; // RST_STREAM frames must be ignored for closed streams.
431             default:
432                 break;
433             }
434 
435             listener.onRstStreamRead(ctx, streamId, errorCode);
436 
437             lifecycleManager.closeStream(stream, ctx.newSucceededFuture());
438         }
439 
440         @Override
441         public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
442             // Apply oldest outstanding local settings here. This is a synchronization point between endpoints.
443             Http2Settings settings = encoder.pollSentSettings();
444 
445             if (settings != null) {
446                 applyLocalSettings(settings);
447             }
448 
449             listener.onSettingsAckRead(ctx);
450         }
451 
452         /**
453          * Applies settings sent from the local endpoint.
454          * <p>
455          * This method is only called after the local settings have been acknowledged from the remote endpoint.
456          */
457         private void applyLocalSettings(Http2Settings settings) throws Http2Exception {
458             Boolean pushEnabled = settings.pushEnabled();
459             final Http2FrameReader.Configuration config = frameReader.configuration();
460             final Http2HeadersDecoder.Configuration headerConfig = config.headersConfiguration();
461             final Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
462             if (pushEnabled != null) {
463                 if (connection.isServer()) {
464                     throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
465                 }
466                 connection.local().allowPushTo(pushEnabled);
467             }
468 
469             Long maxConcurrentStreams = settings.maxConcurrentStreams();
470             if (maxConcurrentStreams != null) {
471                 connection.remote().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
472             }
473 
474             Long headerTableSize = settings.headerTableSize();
475             if (headerTableSize != null) {
476                 headerConfig.maxHeaderTableSize(headerTableSize);
477             }
478 
479             Long maxHeaderListSize = settings.maxHeaderListSize();
480             if (maxHeaderListSize != null) {
481                 headerConfig.maxHeaderListSize(maxHeaderListSize, calculateMaxHeaderListSizeGoAway(maxHeaderListSize));
482             }
483 
484             Integer maxFrameSize = settings.maxFrameSize();
485             if (maxFrameSize != null) {
486                 frameSizePolicy.maxFrameSize(maxFrameSize);
487             }
488 
489             Integer initialWindowSize = settings.initialWindowSize();
490             if (initialWindowSize != null) {
491                 flowController().initialWindowSize(initialWindowSize);
492             }
493         }
494 
495         @Override
496         public void onSettingsRead(final ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
497             if (settingsReceivedConsumer == null) {
498                 // Acknowledge receipt of the settings. We should do this before we process the settings to ensure our
499                 // remote peer applies these settings before any subsequent frames that we may send which depend upon
500                 // these new settings. See https://github.com/netty/netty/issues/6520.
501                 encoder.writeSettingsAck(ctx);
502 
503                 encoder.remoteSettings(settings);
504             } else {
505                 settingsReceivedConsumer.consumeReceivedSettings(settings);
506             }
507 
508             listener.onSettingsRead(ctx, settings);
509         }
510 
511         @Override
512         public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
513             if (autoAckPing) {
514                 // Send an ack back to the remote client.
515                 encoder.writePing(ctx, true, data);
516             }
517             listener.onPingRead(ctx, data);
518         }
519 
520         @Override
521         public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
522             listener.onPingAckRead(ctx, data);
523         }
524 
525         @Override
526         public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
527                 Http2Headers headers, int padding) throws Http2Exception {
528             // A client cannot push.
529             if (connection().isServer()) {
530                 throw connectionError(PROTOCOL_ERROR, "A client cannot push.");
531             }
532 
533             Http2Stream parentStream = connection.stream(streamId);
534 
535             if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, parentStream, "PUSH_PROMISE")) {
536                 return;
537             }
538 
539             switch (parentStream.state()) {
540               case OPEN:
541               case HALF_CLOSED_LOCAL:
542                   // Allowed to receive push promise in these states.
543                   break;
544               default:
545                   // Connection error.
546                   throw connectionError(PROTOCOL_ERROR,
547                       "Stream %d in unexpected state for receiving push promise: %s",
548                       parentStream.id(), parentStream.state());
549             }
550 
551             if (!requestVerifier.isAuthoritative(ctx, headers)) {
552                 throw streamError(promisedStreamId, PROTOCOL_ERROR,
553                         "Promised request on stream %d for promised stream %d is not authoritative",
554                         streamId, promisedStreamId);
555             }
556             if (!requestVerifier.isCacheable(headers)) {
557                 throw streamError(promisedStreamId, PROTOCOL_ERROR,
558                         "Promised request on stream %d for promised stream %d is not known to be cacheable",
559                         streamId, promisedStreamId);
560             }
561             if (!requestVerifier.isSafe(headers)) {
562                 throw streamError(promisedStreamId, PROTOCOL_ERROR,
563                         "Promised request on stream %d for promised stream %d is not known to be safe",
564                         streamId, promisedStreamId);
565             }
566 
567             // Reserve the push stream based with a priority based on the current stream's priority.
568             connection.remote().reservePushStream(promisedStreamId, parentStream);
569 
570             listener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
571         }
572 
573         @Override
574         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, Buffer debugData)
575                 throws Http2Exception {
576             onGoAwayRead0(ctx, lastStreamId, errorCode, debugData);
577         }
578 
579         @Override
580         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
581                 throws Http2Exception {
582             Http2Stream stream = connection.stream(streamId);
583             if (stream == null || stream.state() == CLOSED || streamCreatedAfterGoAwaySent(streamId)) {
584                 // Ignore this frame.
585                 verifyStreamMayHaveExisted(streamId);
586                 return;
587             }
588 
589             // Update the outbound flow control window.
590             encoder.flowController().incrementWindowSize(stream, windowSizeIncrement);
591 
592             listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
593         }
594 
595         @Override
596         public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
597                                    Buffer payload) throws Http2Exception {
598             onUnknownFrame0(ctx, frameType, streamId, flags, payload);
599         }
600 
601         /**
602          * Helper method to determine if a frame that has the semantics of headers or data should be ignored for the
603          * {@code stream} (which may be {@code null}) associated with {@code streamId}.
604          */
605         private boolean shouldIgnoreHeadersOrDataFrame(ChannelHandlerContext ctx, int streamId, Http2Stream stream,
606                 String frameName) throws Http2Exception {
607             if (stream == null) {
608                 if (streamCreatedAfterGoAwaySent(streamId)) {
609                     logger.info("{} ignoring {} frame for stream {}. Stream sent after GOAWAY sent",
610                             ctx.channel(), frameName, streamId);
611                     return true;
612                 }
613 
614                 // Make sure it's not an out-of-order frame, like a rogue DATA frame, for a stream that could
615                 // never have existed.
616                 verifyStreamMayHaveExisted(streamId);
617 
618                 // Its possible that this frame would result in stream ID out of order creation (PROTOCOL ERROR) and its
619                 // also possible that this frame is received on a CLOSED stream (STREAM_CLOSED after a RST_STREAM is
620                 // sent). We don't have enough information to know for sure, so we choose the lesser of the two errors.
621                 throw streamError(streamId, STREAM_CLOSED, "Received %s frame for an unknown stream %d",
622                                   frameName, streamId);
623             }
624             if (stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) {
625                 // If we have sent a reset stream it is assumed the stream will be closed after the write completes.
626                 // If we have not sent a reset, but the stream was created after a GoAway this is not supported by
627                 // DefaultHttp2Connection and if a custom Http2Connection is used it is assumed the lifetime is managed
628                 // elsewhere so we don't close the stream or otherwise modify the stream's state.
629 
630                 if (logger.isInfoEnabled()) {
631                     logger.info("{} ignoring {} frame for stream {}", ctx.channel(), frameName,
632                             stream.isResetSent() ? "RST_STREAM sent." :
633                                     "Stream created after GOAWAY sent. Last known stream by peer " +
634                                      connection.remote().lastStreamKnownByPeer());
635                 }
636 
637                 return true;
638             }
639             return false;
640         }
641 
642         /**
643          * Helper method for determining whether or not to ignore inbound frames. A stream is considered to be created
644          * after a {@code GOAWAY} is sent if the following conditions hold:
645          * <p/>
646          * <ul>
647          *     <li>A {@code GOAWAY} must have been sent by the local endpoint</li>
648          *     <li>The {@code streamId} must identify a legitimate stream id for the remote endpoint to be creating</li>
649          *     <li>{@code streamId} is greater than the Last Known Stream ID which was sent by the local endpoint
650          *     in the last {@code GOAWAY} frame</li>
651          * </ul>
652          * <p/>
653          */
654         private boolean streamCreatedAfterGoAwaySent(int streamId) {
655             Endpoint<?> remote = connection.remote();
656             return connection.goAwaySent() && remote.isValidStreamId(streamId) &&
657                     streamId > remote.lastStreamKnownByPeer();
658         }
659 
660         private void verifyStreamMayHaveExisted(int streamId) throws Http2Exception {
661             if (!connection.streamMayHaveExisted(streamId)) {
662                 throw connectionError(PROTOCOL_ERROR, "Stream %d does not exist", streamId);
663             }
664         }
665     }
666 
667     private final class PrefaceFrameListener implements Http2FrameListener {
668         /**
669          * Verifies that the HTTP/2 connection preface has been received from the remote endpoint.
670          * It is possible that the current call to
671          * {@link Http2FrameReader#readFrame(ChannelHandlerContext, Buffer, Http2FrameListener)} will have multiple
672          * frames to dispatch. So it may be OK for this class to get legitimate frames for the first readFrame.
673          */
674         private void verifyPrefaceReceived() throws Http2Exception {
675             if (!prefaceReceived()) {
676                 throw connectionError(PROTOCOL_ERROR, "Received non-SETTINGS as first frame.");
677             }
678         }
679 
680         @Override
681         public int onDataRead(ChannelHandlerContext ctx, int streamId, Buffer data, int padding, boolean endOfStream)
682                 throws Http2Exception {
683             verifyPrefaceReceived();
684             return internalFrameListener.onDataRead(ctx, streamId, data, padding, endOfStream);
685         }
686 
687         @Override
688         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
689                 boolean endOfStream) throws Http2Exception {
690             verifyPrefaceReceived();
691             internalFrameListener.onHeadersRead(ctx, streamId, headers, padding, endOfStream);
692         }
693 
694         @Override
695         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
696                 short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
697             verifyPrefaceReceived();
698             internalFrameListener.onHeadersRead(ctx, streamId, headers, streamDependency, weight,
699                     exclusive, padding, endOfStream);
700         }
701 
702         @Override
703         public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
704                 boolean exclusive) throws Http2Exception {
705             verifyPrefaceReceived();
706             internalFrameListener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
707         }
708 
709         @Override
710         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
711             verifyPrefaceReceived();
712             internalFrameListener.onRstStreamRead(ctx, streamId, errorCode);
713         }
714 
715         @Override
716         public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
717             verifyPrefaceReceived();
718             internalFrameListener.onSettingsAckRead(ctx);
719         }
720 
721         @Override
722         public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
723             // The first settings should change the internalFrameListener to the "real" listener
724             // that expects the preface to be verified.
725             if (!prefaceReceived()) {
726                 internalFrameListener = new FrameReadListener();
727             }
728             internalFrameListener.onSettingsRead(ctx, settings);
729         }
730 
731         @Override
732         public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
733             verifyPrefaceReceived();
734             internalFrameListener.onPingRead(ctx, data);
735         }
736 
737         @Override
738         public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
739             verifyPrefaceReceived();
740             internalFrameListener.onPingAckRead(ctx, data);
741         }
742 
743         @Override
744         public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
745                 Http2Headers headers, int padding) throws Http2Exception {
746             verifyPrefaceReceived();
747             internalFrameListener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
748         }
749 
750         @Override
751         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, Buffer debugData)
752                 throws Http2Exception {
753             onGoAwayRead0(ctx, lastStreamId, errorCode, debugData);
754         }
755 
756         @Override
757         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
758                 throws Http2Exception {
759             verifyPrefaceReceived();
760             internalFrameListener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
761         }
762 
763         @Override
764         public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
765                                    Buffer payload) throws Http2Exception {
766             onUnknownFrame0(ctx, frameType, streamId, flags, payload);
767         }
768     }
769 
770     private static final class ContentLength {
771         private final long expected;
772         private long seen;
773 
774         ContentLength(long expected) {
775             this.expected = expected;
776         }
777 
778         void increaseReceivedBytes(boolean server, int streamId, int bytes, boolean isEnd) throws Http2Exception {
779             seen += bytes;
780             // Check for overflow
781             if (seen < 0) {
782                 throw streamError(streamId, PROTOCOL_ERROR,
783                         "Received amount of data did overflow and so not match content-length header %d", expected);
784             }
785             // Check if we received more data then what was advertised via the content-length header.
786             if (seen > expected) {
787                 throw streamError(streamId, PROTOCOL_ERROR,
788                         "Received amount of data %d does not match content-length header %d", seen, expected);
789             }
790 
791             if (isEnd) {
792                 if (seen == 0 && !server) {
793                     // This may be a response to a HEAD request, let's just allow it.
794                     return;
795                 }
796 
797                 // Check that we really saw what was told via the content-length header.
798                 if (expected > seen) {
799                     throw streamError(streamId, PROTOCOL_ERROR,
800                             "Received amount of data %d does not match content-length header %d", seen, expected);
801                 }
802             }
803         }
804     }
805 }