View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.http.FullHttpMessage;
21  import io.netty.handler.codec.http.FullHttpRequest;
22  import io.netty.handler.codec.http.FullHttpResponse;
23  import io.netty.handler.codec.http.HttpHeaderNames;
24  import io.netty.handler.codec.http.HttpStatusClass;
25  import io.netty.handler.codec.http.HttpUtil;
26  
27  import static io.netty.handler.codec.http2.Http2Error.ENHANCE_YOUR_CALM;
28  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
29  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
30  import static io.netty.handler.codec.http2.Http2Exception.streamError;
31  import static io.netty.handler.codec.http.HttpResponseStatus.OK;
32  import static io.netty.util.internal.ObjectUtil.checkNotNull;
33  import static io.netty.util.internal.ObjectUtil.checkPositive;
34  
35  /**
36   * This adapter provides just header/data events from the HTTP message flow defined
37   * in <a href="https://tools.ietf.org/html/rfc7540#section-8.1">[RFC 7540], Section 8.1</a>.
38   * <p>
39   * See {@link HttpToHttp2ConnectionHandler} to get translation from HTTP/1.x objects to HTTP/2 frames for writes.
40   */
41  public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
42      private static final ImmediateSendDetector DEFAULT_SEND_DETECTOR = new ImmediateSendDetector() {
43          @Override
44          public boolean mustSendImmediately(FullHttpMessage msg) {
45              if (msg instanceof FullHttpResponse) {
46                  return ((FullHttpResponse) msg).status().codeClass() == HttpStatusClass.INFORMATIONAL;
47              }
48              if (msg instanceof FullHttpRequest) {
49                  return msg.headers().contains(HttpHeaderNames.EXPECT);
50              }
51              return false;
52          }
53  
54          @Override
55          public FullHttpMessage copyIfNeeded(ByteBufAllocator allocator, FullHttpMessage msg) {
56              if (msg instanceof FullHttpRequest) {
57                  FullHttpRequest copy = ((FullHttpRequest) msg).replace(allocator.buffer(0));
58                  copy.headers().remove(HttpHeaderNames.EXPECT);
59                  return copy;
60              }
61              return null;
62          }
63      };
64  
65      private final int maxContentLength;
66      private final ImmediateSendDetector sendDetector;
67      private final Http2Connection.PropertyKey messageKey;
68      private final boolean propagateSettings;
69      protected final Http2Connection connection;
70      protected final boolean validateHttpHeaders;
71  
72      protected InboundHttp2ToHttpAdapter(Http2Connection connection, int maxContentLength,
73                                          boolean validateHttpHeaders, boolean propagateSettings) {
74          this.connection = checkNotNull(connection, "connection");
75          this.maxContentLength = checkPositive(maxContentLength, "maxContentLength");
76          this.validateHttpHeaders = validateHttpHeaders;
77          this.propagateSettings = propagateSettings;
78          sendDetector = DEFAULT_SEND_DETECTOR;
79          messageKey = connection.newKey();
80      }
81  
82      /**
83       * The stream is out of scope for the HTTP message flow and will no longer be tracked
84       * @param stream The stream to remove associated state with
85       * @param release {@code true} to call release on the value if it is present. {@code false} to not call release.
86       */
87      protected final void removeMessage(Http2Stream stream, boolean release) {
88          FullHttpMessage msg = stream.removeProperty(messageKey);
89          if (release && msg != null) {
90              msg.release();
91          }
92      }
93  
94      /**
95       * Get the {@link FullHttpMessage} associated with {@code stream}.
96       * @param stream The stream to get the associated state from
97       * @return The {@link FullHttpMessage} associated with {@code stream}.
98       */
99      protected final FullHttpMessage getMessage(Http2Stream stream) {
100         return (FullHttpMessage) stream.getProperty(messageKey);
101     }
102 
103     /**
104      * Make {@code message} be the state associated with {@code stream}.
105      * @param stream The stream which {@code message} is associated with.
106      * @param message The message which contains the HTTP semantics.
107      */
108     protected final void putMessage(Http2Stream stream, FullHttpMessage message) {
109         FullHttpMessage previous = stream.setProperty(messageKey, message);
110         if (previous != message && previous != null) {
111             previous.release();
112         }
113     }
114 
115     @Override
116     public void onStreamRemoved(Http2Stream stream) {
117         removeMessage(stream, true);
118     }
119 
120     /**
121      * Set final headers and fire a channel read event
122      *
123      * @param ctx The context to fire the event on
124      * @param msg The message to send
125      * @param release {@code true} to call release on the value if it is present. {@code false} to not call release.
126      * @param stream the stream of the message which is being fired
127      */
128     protected void fireChannelRead(ChannelHandlerContext ctx, FullHttpMessage msg, boolean release,
129                                    Http2Stream stream) {
130         removeMessage(stream, release);
131         HttpUtil.setContentLength(msg, msg.content().readableBytes());
132         ctx.fireChannelRead(msg);
133     }
134 
135     /**
136      * Create a new {@link FullHttpMessage} based upon the current connection parameters
137      *
138      * @param stream The stream to create a message for
139      * @param headers The headers associated with {@code stream}
140      * @param validateHttpHeaders
141      * <ul>
142      * <li>{@code true} to validate HTTP headers in the http-codec</li>
143      * <li>{@code false} not to validate HTTP headers in the http-codec</li>
144      * </ul>
145      * @param alloc The {@link ByteBufAllocator} to use to generate the content of the message
146      * @throws Http2Exception If there is an error when creating {@link FullHttpMessage} from
147      *                        {@link Http2Stream} and {@link Http2Headers}
148      */
149     protected FullHttpMessage newMessage(Http2Stream stream, Http2Headers headers, boolean validateHttpHeaders,
150                                          ByteBufAllocator alloc) throws Http2Exception {
151         return connection.isServer() ? HttpConversionUtil.toFullHttpRequest(stream.id(), headers, alloc,
152                 validateHttpHeaders) : HttpConversionUtil.toFullHttpResponse(stream.id(), headers, alloc,
153                 validateHttpHeaders);
154     }
155 
156     /**
157      * Provides translation between HTTP/2 and HTTP header objects while ensuring the stream
158      * is in a valid state for additional headers.
159      *
160      * @param ctx The context for which this message has been received.
161      * Used to send informational header if detected.
162      * @param stream The stream the {@code headers} apply to
163      * @param headers The headers to process
164      * @param endOfStream {@code true} if the {@code stream} has received the end of stream flag
165      * @param allowAppend
166      * <ul>
167      * <li>{@code true} if headers will be appended if the stream already exists.</li>
168      * <li>if {@code false} and the stream already exists this method returns {@code null}.</li>
169      * </ul>
170      * @param appendToTrailer
171      * <ul>
172      * <li>{@code true} if a message {@code stream} already exists then the headers
173      * should be added to the trailing headers.</li>
174      * <li>{@code false} then appends will be done to the initial headers.</li>
175      * </ul>
176      * @return The object used to track the stream corresponding to {@code stream}. {@code null} if
177      *         {@code allowAppend} is {@code false} and the stream already exists.
178      * @throws Http2Exception If the stream id is not in the correct state to process the headers request
179      */
180     protected FullHttpMessage processHeadersBegin(ChannelHandlerContext ctx, Http2Stream stream, Http2Headers headers,
181                                                   boolean endOfStream, boolean allowAppend, boolean appendToTrailer)
182             throws Http2Exception {
183         FullHttpMessage msg = getMessage(stream);
184         boolean release = true;
185         if (msg == null) {
186             msg = newMessage(stream, headers, validateHttpHeaders, ctx.alloc());
187         } else if (allowAppend) {
188             release = false;
189             HttpConversionUtil.addHttp2ToHttpHeaders(stream.id(), headers, msg, appendToTrailer);
190         } else {
191             release = false;
192             msg = null;
193         }
194 
195         if (sendDetector.mustSendImmediately(msg)) {
196             // Copy the message (if necessary) before sending. The content is not expected to be copied (or used) in
197             // this operation but just in case it is used do the copy before sending and the resource may be released
198             final FullHttpMessage copy = endOfStream ? null : sendDetector.copyIfNeeded(ctx.alloc(), msg);
199             fireChannelRead(ctx, msg, release, stream);
200             return copy;
201         }
202 
203         return msg;
204     }
205 
206     /**
207      * After HTTP/2 headers have been processed by {@link #processHeadersBegin} this method either
208      * sends the result up the pipeline or retains the message for future processing.
209      *
210      * @param ctx The context for which this message has been received
211      * @param stream The stream the {@code objAccumulator} corresponds to
212      * @param msg The object which represents all headers/data for corresponding to {@code stream}
213      * @param endOfStream {@code true} if this is the last event for the stream
214      */
215     private void processHeadersEnd(ChannelHandlerContext ctx, Http2Stream stream, FullHttpMessage msg,
216                                    boolean endOfStream) {
217         if (endOfStream) {
218             // Release if the msg from the map is different from the object being forwarded up the pipeline.
219             fireChannelRead(ctx, msg, getMessage(stream) != msg, stream);
220         } else {
221             putMessage(stream, msg);
222         }
223     }
224 
225     @Override
226     public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
227             throws Http2Exception {
228         Http2Stream stream = connection.stream(streamId);
229         FullHttpMessage msg = getMessage(stream);
230         if (msg == null) {
231             throw connectionError(PROTOCOL_ERROR, "Data Frame received for unknown stream id %d", streamId);
232         }
233 
234         ByteBuf content = msg.content();
235         final int dataReadableBytes = data.readableBytes();
236         if (content.readableBytes() > maxContentLength - dataReadableBytes) {
237             throw streamError(streamId, ENHANCE_YOUR_CALM,
238                     "Content length exceeded max of %d for stream id %d", maxContentLength, streamId);
239         }
240 
241         content.writeBytes(data, data.readerIndex(), dataReadableBytes);
242 
243         if (endOfStream) {
244             fireChannelRead(ctx, msg, false, stream);
245         }
246 
247         // All bytes have been processed.
248         return dataReadableBytes + padding;
249     }
250 
251     @Override
252     public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
253                               boolean endOfStream) throws Http2Exception {
254         Http2Stream stream = connection.stream(streamId);
255         FullHttpMessage msg = processHeadersBegin(ctx, stream, headers, endOfStream, true, true);
256         if (msg != null) {
257             processHeadersEnd(ctx, stream, msg, endOfStream);
258         }
259     }
260 
261     @Override
262     public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
263                               short weight, boolean exclusive, int padding, boolean endOfStream)
264             throws Http2Exception {
265         Http2Stream stream = connection.stream(streamId);
266         FullHttpMessage msg = processHeadersBegin(ctx, stream, headers, endOfStream, true, true);
267         if (msg != null) {
268             // Add headers for dependency and weight.
269             // See https://github.com/netty/netty/issues/5866
270             if (streamDependency != Http2CodecUtil.CONNECTION_STREAM_ID) {
271                 msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_DEPENDENCY_ID.text(),
272                         streamDependency);
273             }
274             msg.headers().setShort(HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(), weight);
275 
276             processHeadersEnd(ctx, stream, msg, endOfStream);
277         }
278     }
279 
280     @Override
281     public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
282         Http2Stream stream = connection.stream(streamId);
283         FullHttpMessage msg = getMessage(stream);
284         if (msg != null) {
285             onRstStreamRead(stream, msg);
286         }
287         ctx.fireExceptionCaught(Http2Exception.streamError(streamId, Http2Error.valueOf(errorCode),
288                 "HTTP/2 to HTTP layer caught stream reset"));
289     }
290 
291     @Override
292     public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
293                                   Http2Headers headers, int padding) throws Http2Exception {
294         // A push promise should not be allowed to add headers to an existing stream
295         Http2Stream promisedStream = connection.stream(promisedStreamId);
296         if (headers.status() == null) {
297             // A PUSH_PROMISE frame has no Http response status.
298             // https://tools.ietf.org/html/rfc7540#section-8.2.1
299             // Server push is semantically equivalent to a server responding to a
300             // request; however, in this case, that request is also sent by the
301             // server, as a PUSH_PROMISE frame.
302             headers.status(OK.codeAsText());
303         }
304         FullHttpMessage msg = processHeadersBegin(ctx, promisedStream, headers, false, false, false);
305         if (msg == null) {
306             throw connectionError(PROTOCOL_ERROR, "Push Promise Frame received for pre-existing stream id %d",
307                     promisedStreamId);
308         }
309 
310         msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_PROMISE_ID.text(), streamId);
311         msg.headers().setShort(HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(),
312                 Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT);
313 
314         processHeadersEnd(ctx, promisedStream, msg, false);
315     }
316 
317     @Override
318     public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
319         if (propagateSettings) {
320             // Provide an interface for non-listeners to capture settings
321             ctx.fireChannelRead(settings);
322         }
323     }
324 
325     /**
326      * Called if a {@code RST_STREAM} is received but we have some data for that stream.
327      */
328     protected void onRstStreamRead(Http2Stream stream, FullHttpMessage msg) {
329         removeMessage(stream, true);
330     }
331 
332     /**
333      * Allows messages to be sent up the pipeline before the next phase in the
334      * HTTP message flow is detected.
335      */
336     private interface ImmediateSendDetector {
337         /**
338          * Determine if the response should be sent immediately, or wait for the end of the stream
339          *
340          * @param msg The response to test
341          * @return {@code true} if the message should be sent immediately
342          *         {@code false) if we should wait for the end of the stream
343          */
344         boolean mustSendImmediately(FullHttpMessage msg);
345 
346         /**
347          * Determine if a copy must be made after an immediate send happens.
348          * <p>
349          * An example of this use case is if a request is received
350          * with a 'Expect: 100-continue' header. The message will be sent immediately,
351          * and the data will be queued and sent at the end of the stream.
352          *
353          * @param allocator The {@link ByteBufAllocator} that can be used to allocate
354          * @param msg The message which has just been sent due to {@link #mustSendImmediately(FullHttpMessage)}
355          * @return A modified copy of the {@code msg} or {@code null} if a copy is not needed.
356          */
357         FullHttpMessage copyIfNeeded(ByteBufAllocator allocator, FullHttpMessage msg);
358     }
359 }