View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
18  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
19  import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
20  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
21  import static io.netty.handler.codec.http2.Http2Exception.streamError;
22  import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
23  import static io.netty.util.internal.ObjectUtil.checkNotNull;
24  import io.netty.buffer.ByteBuf;
25  import io.netty.channel.ChannelHandlerContext;
26  
27  import java.util.List;
28  
29  /**
30   * Provides the default implementation for processing inbound frame events and delegates to a
31   * {@link Http2FrameListener}
32   * <p>
33   * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
34   * <p>
35   * This interface enforces inbound flow control functionality through
36   * {@link Http2LocalFlowController}
37   */
38  public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
39      private final Http2FrameListener internalFrameListener = new FrameReadListener();
40      private final Http2Connection connection;
41      private final Http2LifecycleManager lifecycleManager;
42      private final Http2ConnectionEncoder encoder;
43      private final Http2FrameReader frameReader;
44      private final Http2FrameListener listener;
45      private boolean prefaceReceived;
46  
47      /**
48       * Builder for instances of {@link DefaultHttp2ConnectionDecoder}.
49       */
50      public static class Builder implements Http2ConnectionDecoder.Builder {
51          private Http2Connection connection;
52          private Http2LifecycleManager lifecycleManager;
53          private Http2ConnectionEncoder encoder;
54          private Http2FrameReader frameReader;
55          private Http2FrameListener listener;
56  
57          @Override
58          public Builder connection(Http2Connection connection) {
59              this.connection = connection;
60              return this;
61          }
62  
63          @Override
64          public Builder lifecycleManager(Http2LifecycleManager lifecycleManager) {
65              this.lifecycleManager = lifecycleManager;
66              return this;
67          }
68  
69          @Override
70          public Http2LifecycleManager lifecycleManager() {
71              return lifecycleManager;
72          }
73  
74          @Override
75          public Builder frameReader(Http2FrameReader frameReader) {
76              this.frameReader = frameReader;
77              return this;
78          }
79  
80          @Override
81          public Builder listener(Http2FrameListener listener) {
82              this.listener = listener;
83              return this;
84          }
85  
86          @Override
87          public Builder encoder(Http2ConnectionEncoder encoder) {
88              this.encoder = encoder;
89              return this;
90          }
91  
92          @Override
93          public Http2ConnectionDecoder build() {
94              return new DefaultHttp2ConnectionDecoder(this);
95          }
96      }
97  
98      public static Builder newBuilder() {
99          return new Builder();
100     }
101 
102     protected DefaultHttp2ConnectionDecoder(Builder builder) {
103         connection = checkNotNull(builder.connection, "connection");
104         frameReader = checkNotNull(builder.frameReader, "frameReader");
105         lifecycleManager = checkNotNull(builder.lifecycleManager, "lifecycleManager");
106         encoder = checkNotNull(builder.encoder, "encoder");
107         listener = checkNotNull(builder.listener, "listener");
108         if (connection.local().flowController() == null) {
109             connection.local().flowController(
110                     new DefaultHttp2LocalFlowController(connection, encoder.frameWriter()));
111         }
112     }
113 
114     @Override
115     public Http2Connection connection() {
116         return connection;
117     }
118 
119     @Override
120     public final Http2LocalFlowController flowController() {
121         return connection.local().flowController();
122     }
123 
124     @Override
125     public Http2FrameListener listener() {
126         return listener;
127     }
128 
129     @Override
130     public boolean prefaceReceived() {
131         return prefaceReceived;
132     }
133 
134     @Override
135     public void decodeFrame(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Http2Exception {
136         frameReader.readFrame(ctx, in, internalFrameListener);
137     }
138 
139     @Override
140     public Http2Settings localSettings() {
141         Http2Settings settings = new Http2Settings();
142         Http2FrameReader.Configuration config = frameReader.configuration();
143         Http2HeaderTable headerTable = config.headerTable();
144         Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
145         settings.initialWindowSize(flowController().initialWindowSize());
146         settings.maxConcurrentStreams(connection.remote().maxStreams());
147         settings.headerTableSize(headerTable.maxHeaderTableSize());
148         settings.maxFrameSize(frameSizePolicy.maxFrameSize());
149         settings.maxHeaderListSize(headerTable.maxHeaderListSize());
150         if (!connection.isServer()) {
151             // Only set the pushEnabled flag if this is a client endpoint.
152             settings.pushEnabled(connection.local().allowPushTo());
153         }
154         return settings;
155     }
156 
157     @Override
158     public void localSettings(Http2Settings settings) throws Http2Exception {
159         Boolean pushEnabled = settings.pushEnabled();
160         Http2FrameReader.Configuration config = frameReader.configuration();
161         Http2HeaderTable inboundHeaderTable = config.headerTable();
162         Http2FrameSizePolicy inboundFrameSizePolicy = config.frameSizePolicy();
163         if (pushEnabled != null) {
164             if (connection.isServer()) {
165                 throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
166             }
167             connection.local().allowPushTo(pushEnabled);
168         }
169 
170         Long maxConcurrentStreams = settings.maxConcurrentStreams();
171         if (maxConcurrentStreams != null) {
172             int value = (int) Math.min(maxConcurrentStreams, Integer.MAX_VALUE);
173             connection.remote().maxStreams(value);
174         }
175 
176         Long headerTableSize = settings.headerTableSize();
177         if (headerTableSize != null) {
178             inboundHeaderTable.maxHeaderTableSize((int) Math.min(headerTableSize, Integer.MAX_VALUE));
179         }
180 
181         Integer maxHeaderListSize = settings.maxHeaderListSize();
182         if (maxHeaderListSize != null) {
183             inboundHeaderTable.maxHeaderListSize(maxHeaderListSize);
184         }
185 
186         Integer maxFrameSize = settings.maxFrameSize();
187         if (maxFrameSize != null) {
188             inboundFrameSizePolicy.maxFrameSize(maxFrameSize);
189         }
190 
191         Integer initialWindowSize = settings.initialWindowSize();
192         if (initialWindowSize != null) {
193             flowController().initialWindowSize(initialWindowSize);
194         }
195     }
196 
197     @Override
198     public void close() {
199         frameReader.close();
200     }
201 
202     private int unconsumedBytes(Http2Stream stream) {
203         return flowController().unconsumedBytes(stream);
204     }
205 
206     /**
207      * Handles all inbound frames from the network.
208      */
209     private final class FrameReadListener implements Http2FrameListener {
210 
211         @Override
212         public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data,
213                 int padding, boolean endOfStream) throws Http2Exception {
214             verifyPrefaceReceived();
215 
216             // Check if we received a data frame for a stream which is half-closed
217             Http2Stream stream = connection.requireStream(streamId);
218 
219             verifyGoAwayNotReceived();
220 
221             // We should ignore this frame if RST_STREAM was sent or if GO_AWAY was sent with a
222             // lower stream ID.
223             boolean shouldIgnore = shouldIgnoreFrame(stream, false);
224             Http2Exception error = null;
225             switch (stream.state()) {
226                 case OPEN:
227                 case HALF_CLOSED_LOCAL:
228                     break;
229                 case HALF_CLOSED_REMOTE:
230                     // Always fail the stream if we've more data after the remote endpoint half-closed.
231                     error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
232                         stream.id(), stream.state());
233                     break;
234                 case CLOSED:
235                     if (!shouldIgnore) {
236                         error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
237                                 stream.id(), stream.state());
238                     }
239                     break;
240                 default:
241                     if (!shouldIgnore) {
242                         error = streamError(stream.id(), PROTOCOL_ERROR,
243                                 "Stream %d in unexpected state: %s", stream.id(), stream.state());
244                     }
245                     break;
246             }
247 
248             int bytesToReturn = data.readableBytes() + padding;
249             int unconsumedBytes = unconsumedBytes(stream);
250             Http2LocalFlowController flowController = flowController();
251             try {
252                 // If we should apply flow control, do so now.
253                 flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream);
254                 // Update the unconsumed bytes after flow control is applied.
255                 unconsumedBytes = unconsumedBytes(stream);
256 
257                 // If we should ignore this frame, do so now.
258                 if (shouldIgnore) {
259                     return bytesToReturn;
260                 }
261 
262                 // If the stream was in an invalid state to receive the frame, throw the error.
263                 if (error != null) {
264                     throw error;
265                 }
266 
267                 // Call back the application and retrieve the number of bytes that have been
268                 // immediately processed.
269                 bytesToReturn = listener.onDataRead(ctx, streamId, data, padding, endOfStream);
270                 return bytesToReturn;
271             } catch (Http2Exception e) {
272                 // If an exception happened during delivery, the listener may have returned part
273                 // of the bytes before the error occurred. If that's the case, subtract that from
274                 // the total processed bytes so that we don't return too many bytes.
275                 int delta = unconsumedBytes - unconsumedBytes(stream);
276                 bytesToReturn -= delta;
277                 throw e;
278             } catch (RuntimeException e) {
279                 // If an exception happened during delivery, the listener may have returned part
280                 // of the bytes before the error occurred. If that's the case, subtract that from
281                 // the total processed bytes so that we don't return too many bytes.
282                 int delta = unconsumedBytes - unconsumedBytes(stream);
283                 bytesToReturn -= delta;
284                 throw e;
285             } finally {
286                 // If appropriate, returned the processed bytes to the flow controller.
287                 if (bytesToReturn > 0) {
288                     flowController.consumeBytes(ctx, stream, bytesToReturn);
289                 }
290 
291                 if (endOfStream) {
292                     lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture());
293                 }
294             }
295         }
296 
297         /**
298          * Verifies that the HTTP/2 connection preface has been received from the remote endpoint.
299          */
300         private void verifyPrefaceReceived() throws Http2Exception {
301             if (!prefaceReceived) {
302                 throw connectionError(PROTOCOL_ERROR, "Received non-SETTINGS as first frame.");
303             }
304         }
305 
306         @Override
307         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
308                 boolean endOfStream) throws Http2Exception {
309             onHeadersRead(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endOfStream);
310         }
311 
312         @Override
313         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
314                 short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
315             verifyPrefaceReceived();
316 
317             Http2Stream stream = connection.stream(streamId);
318             verifyGoAwayNotReceived();
319             if (shouldIgnoreFrame(stream, false)) {
320                 // Ignore this frame.
321                 return;
322             }
323 
324             if (stream == null) {
325                 stream = connection.createRemoteStream(streamId).open(endOfStream);
326             } else {
327                 switch (stream.state()) {
328                     case RESERVED_REMOTE:
329                     case IDLE:
330                         stream.open(endOfStream);
331                         break;
332                     case OPEN:
333                     case HALF_CLOSED_LOCAL:
334                         // Allowed to receive headers in these states.
335                         break;
336                     case HALF_CLOSED_REMOTE:
337                     case CLOSED:
338                         // Stream error.
339                         throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
340                                 stream.id(), stream.state());
341                     default:
342                         // Connection error.
343                         throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(),
344                                 stream.state());
345                 }
346             }
347 
348             listener.onHeadersRead(ctx, streamId, headers,
349                     streamDependency, weight, exclusive, padding, endOfStream);
350 
351             stream.setPriority(streamDependency, weight, exclusive);
352 
353             // If the headers completes this stream, close it.
354             if (endOfStream) {
355                 lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture());
356             }
357         }
358 
359         @Override
360         public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
361                 boolean exclusive) throws Http2Exception {
362             verifyPrefaceReceived();
363 
364             Http2Stream stream = connection.stream(streamId);
365             verifyGoAwayNotReceived();
366             if (shouldIgnoreFrame(stream, true)) {
367                 // Ignore this frame.
368                 return;
369             }
370 
371             if (stream == null) {
372                 // PRIORITY frames always identify a stream. This means that if a PRIORITY frame is the
373                 // first frame to be received for a stream that we must create the stream.
374                 stream = connection.createRemoteStream(streamId);
375             }
376 
377             // This call will create a stream for streamDependency if necessary.
378             // For this reason it must be done before notifying the listener.
379             stream.setPriority(streamDependency, weight, exclusive);
380 
381             listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
382         }
383 
384         @Override
385         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
386             verifyPrefaceReceived();
387 
388             Http2Stream stream = connection.requireStream(streamId);
389             if (stream.state() == CLOSED) {
390                 // RstStream frames must be ignored for closed streams.
391                 return;
392             }
393 
394             listener.onRstStreamRead(ctx, streamId, errorCode);
395 
396             lifecycleManager.closeStream(stream, ctx.newSucceededFuture());
397         }
398 
399         @Override
400         public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
401             verifyPrefaceReceived();
402             // Apply oldest outstanding local settings here. This is a synchronization point
403             // between endpoints.
404             Http2Settings settings = encoder.pollSentSettings();
405 
406             if (settings != null) {
407                 applyLocalSettings(settings);
408             }
409 
410             listener.onSettingsAckRead(ctx);
411         }
412 
413         /**
414          * Applies settings sent from the local endpoint.
415          */
416         private void applyLocalSettings(Http2Settings settings) throws Http2Exception {
417             Boolean pushEnabled = settings.pushEnabled();
418             final Http2FrameReader.Configuration config = frameReader.configuration();
419             final Http2HeaderTable headerTable = config.headerTable();
420             final Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
421             if (pushEnabled != null) {
422                 if (connection.isServer()) {
423                     throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
424                 }
425                 connection.local().allowPushTo(pushEnabled);
426             }
427 
428             Long maxConcurrentStreams = settings.maxConcurrentStreams();
429             if (maxConcurrentStreams != null) {
430                 int value = (int) Math.min(maxConcurrentStreams, Integer.MAX_VALUE);
431                 connection.remote().maxStreams(value);
432             }
433 
434             Long headerTableSize = settings.headerTableSize();
435             if (headerTableSize != null) {
436                 headerTable.maxHeaderTableSize((int) Math.min(headerTableSize, Integer.MAX_VALUE));
437             }
438 
439             Integer maxHeaderListSize = settings.maxHeaderListSize();
440             if (maxHeaderListSize != null) {
441                 headerTable.maxHeaderListSize(maxHeaderListSize);
442             }
443 
444             Integer maxFrameSize = settings.maxFrameSize();
445             if (maxFrameSize != null) {
446                 frameSizePolicy.maxFrameSize(maxFrameSize);
447             }
448 
449             Integer initialWindowSize = settings.initialWindowSize();
450             if (initialWindowSize != null) {
451                 flowController().initialWindowSize(initialWindowSize);
452             }
453         }
454 
455         @Override
456         public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
457             encoder.remoteSettings(settings);
458 
459             // Acknowledge receipt of the settings.
460             encoder.writeSettingsAck(ctx, ctx.newPromise());
461 
462             // We've received at least one non-ack settings frame from the remote endpoint.
463             prefaceReceived = true;
464 
465             listener.onSettingsRead(ctx, settings);
466         }
467 
468         @Override
469         public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
470             verifyPrefaceReceived();
471 
472             // Send an ack back to the remote client.
473             // Need to retain the buffer here since it will be released after the write completes.
474             encoder.writePing(ctx, true, data.retain(), ctx.newPromise());
475             ctx.flush();
476 
477             listener.onPingRead(ctx, data);
478         }
479 
480         @Override
481         public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
482             verifyPrefaceReceived();
483 
484             listener.onPingAckRead(ctx, data);
485         }
486 
487         @Override
488         public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
489                 Http2Headers headers, int padding) throws Http2Exception {
490             verifyPrefaceReceived();
491 
492             Http2Stream parentStream = connection.requireStream(streamId);
493             verifyGoAwayNotReceived();
494             if (shouldIgnoreFrame(parentStream, false)) {
495                 // Ignore frames for any stream created after we sent a go-away.
496                 return;
497             }
498 
499             switch (parentStream.state()) {
500               case OPEN:
501               case HALF_CLOSED_LOCAL:
502                   // Allowed to receive push promise in these states.
503                   break;
504               default:
505                   // Connection error.
506                   throw connectionError(PROTOCOL_ERROR,
507                       "Stream %d in unexpected state for receiving push promise: %s",
508                       parentStream.id(), parentStream.state());
509             }
510 
511             // Reserve the push stream based with a priority based on the current stream's priority.
512             connection.remote().reservePushStream(promisedStreamId, parentStream);
513 
514             listener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
515         }
516 
517         @Override
518         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
519                 throws Http2Exception {
520             // Don't allow any more connections to be created.
521             connection.goAwayReceived(lastStreamId);
522 
523             listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData);
524         }
525 
526         @Override
527         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
528                 throws Http2Exception {
529             verifyPrefaceReceived();
530 
531             Http2Stream stream = connection.requireStream(streamId);
532             verifyGoAwayNotReceived();
533             if (stream.state() == CLOSED || shouldIgnoreFrame(stream, false)) {
534                 // Ignore frames for any stream created after we sent a go-away.
535                 return;
536             }
537 
538             // Update the outbound flow controller.
539             encoder.flowController().incrementWindowSize(ctx, stream, windowSizeIncrement);
540 
541             listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
542         }
543 
544         @Override
545         public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
546                 ByteBuf payload) {
547             listener.onUnknownFrame(ctx, frameType, streamId, flags, payload);
548         }
549 
550         /**
551          * Indicates whether or not frames for the given stream should be ignored based on the state of the
552          * stream/connection.
553          */
554         private boolean shouldIgnoreFrame(Http2Stream stream, boolean allowResetSent) {
555             if (connection.goAwaySent() &&
556                     (stream == null || connection.remote().lastStreamCreated() <= stream.id())) {
557                 // Frames from streams created after we sent a go-away should be ignored.
558                 // Frames for the connection stream ID (i.e. 0) will always be allowed.
559                 return true;
560             }
561 
562             // Also ignore inbound frames after we sent a RST_STREAM frame.
563             return stream != null && !allowResetSent && stream.isResetSent();
564         }
565 
566         /**
567          * Verifies that a GO_AWAY frame was not previously received from the remote endpoint. If it was, throws a
568          * connection error.
569          */
570         private void verifyGoAwayNotReceived() throws Http2Exception {
571             if (connection.goAwayReceived()) {
572                 // Connection error.
573                 throw connectionError(PROTOCOL_ERROR, "Received frames after receiving GO_AWAY");
574             }
575         }
576     }
577 }