View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.http.FullHttpMessage;
22  import io.netty.handler.codec.http.FullHttpRequest;
23  import io.netty.handler.codec.http.FullHttpResponse;
24  import io.netty.handler.codec.http.HttpHeaderNames;
25  import io.netty.handler.codec.http.HttpStatusClass;
26  import io.netty.handler.codec.http.HttpUtil;
27  import io.netty.util.internal.UnstableApi;
28  
29  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
30  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
31  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
32  import static io.netty.handler.codec.http.HttpResponseStatus.OK;
33  import static io.netty.util.internal.ObjectUtil.checkNotNull;
34  
35  /**
36   * This adapter provides just header/data events from the HTTP message flow defined
37   * here <a href="http://tools.ietf.org/html/draft-ietf-httpbis-http2-16#section-8.1.">HTTP/2 Spec Message Flow</a>.
38   * <p>
39   * See {@link HttpToHttp2ConnectionHandler} to get translation from HTTP/1.x objects to HTTP/2 frames for writes.
40   */
41  @UnstableApi
42  public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
43      private static final ImmediateSendDetector DEFAULT_SEND_DETECTOR = new ImmediateSendDetector() {
44          @Override
45          public boolean mustSendImmediately(FullHttpMessage msg) {
46              if (msg instanceof FullHttpResponse) {
47                  return ((FullHttpResponse) msg).status().codeClass() == HttpStatusClass.INFORMATIONAL;
48              }
49              if (msg instanceof FullHttpRequest) {
50                  return msg.headers().contains(HttpHeaderNames.EXPECT);
51              }
52              return false;
53          }
54  
55          @Override
56          public FullHttpMessage copyIfNeeded(FullHttpMessage msg) {
57              if (msg instanceof FullHttpRequest) {
58                  FullHttpRequest copy = ((FullHttpRequest) msg).replace(Unpooled.buffer(0));
59                  copy.headers().remove(HttpHeaderNames.EXPECT);
60                  return copy;
61              }
62              return null;
63          }
64      };
65  
66      private final int maxContentLength;
67      private final ImmediateSendDetector sendDetector;
68      private final Http2Connection.PropertyKey messageKey;
69      private final boolean propagateSettings;
70      protected final Http2Connection connection;
71      protected final boolean validateHttpHeaders;
72  
73      protected InboundHttp2ToHttpAdapter(Http2Connection connection, int maxContentLength,
74                                          boolean validateHttpHeaders, boolean propagateSettings) {
75  
76          checkNotNull(connection, "connection");
77          if (maxContentLength <= 0) {
78              throw new IllegalArgumentException("maxContentLength: " + maxContentLength + " (expected: > 0)");
79          }
80          this.connection = connection;
81          this.maxContentLength = maxContentLength;
82          this.validateHttpHeaders = validateHttpHeaders;
83          this.propagateSettings = propagateSettings;
84          sendDetector = DEFAULT_SEND_DETECTOR;
85          messageKey = connection.newKey();
86      }
87  
88      /**
89       * The stream is out of scope for the HTTP message flow and will no longer be tracked
90       * @param stream The stream to remove associated state with
91       * @param release {@code true} to call release on the value if it is present. {@code false} to not call release.
92       */
93      protected final void removeMessage(Http2Stream stream, boolean release) {
94          FullHttpMessage msg = stream.removeProperty(messageKey);
95          if (release && msg != null) {
96              msg.release();
97          }
98      }
99  
100     /**
101      * Get the {@link FullHttpMessage} associated with {@code stream}.
102      * @param stream The stream to get the associated state from
103      * @return The {@link FullHttpMessage} associated with {@code stream}.
104      */
105     protected final FullHttpMessage getMessage(Http2Stream stream) {
106         return (FullHttpMessage) stream.getProperty(messageKey);
107     }
108 
109     /**
110      * Make {@code message} be the state associated with {@code stream}.
111      * @param stream The stream which {@code message} is associated with.
112      * @param message The message which contains the HTTP semantics.
113      */
114     protected final void putMessage(Http2Stream stream, FullHttpMessage message) {
115         FullHttpMessage previous = stream.setProperty(messageKey, message);
116         if (previous != message && previous != null) {
117             previous.release();
118         }
119     }
120 
121     @Override
122     public void onStreamRemoved(Http2Stream stream) {
123         removeMessage(stream, true);
124     }
125 
126     /**
127      * Set final headers and fire a channel read event
128      *
129      * @param ctx The context to fire the event on
130      * @param msg The message to send
131      * @param release {@code true} to call release on the value if it is present. {@code false} to not call release.
132      * @param stream the stream of the message which is being fired
133      */
134     protected void fireChannelRead(ChannelHandlerContext ctx, FullHttpMessage msg, boolean release,
135                                    Http2Stream stream) {
136         removeMessage(stream, release);
137         HttpUtil.setContentLength(msg, msg.content().readableBytes());
138         ctx.fireChannelRead(msg);
139     }
140 
141     /**
142      * Create a new {@link FullHttpMessage} based upon the current connection parameters
143      *
144      * @param stream The stream to create a message for
145      * @param headers The headers associated with {@code stream}
146      * @param validateHttpHeaders
147      * <ul>
148      * <li>{@code true} to validate HTTP headers in the http-codec</li>
149      * <li>{@code false} not to validate HTTP headers in the http-codec</li>
150      * </ul>
151      * @param alloc The {@link ByteBufAllocator} to use to generate the content of the message
152      * @throws Http2Exception
153      */
154     protected FullHttpMessage newMessage(Http2Stream stream, Http2Headers headers, boolean validateHttpHeaders,
155                                          ByteBufAllocator alloc)
156             throws Http2Exception {
157         return connection.isServer() ? HttpConversionUtil.toFullHttpRequest(stream.id(), headers, alloc,
158                 validateHttpHeaders) : HttpConversionUtil.toFullHttpResponse(stream.id(), headers, alloc,
159                                                                          validateHttpHeaders);
160     }
161 
162     /**
163      * Provides translation between HTTP/2 and HTTP header objects while ensuring the stream
164      * is in a valid state for additional headers.
165      *
166      * @param ctx The context for which this message has been received.
167      * Used to send informational header if detected.
168      * @param stream The stream the {@code headers} apply to
169      * @param headers The headers to process
170      * @param endOfStream {@code true} if the {@code stream} has received the end of stream flag
171      * @param allowAppend
172      * <ul>
173      * <li>{@code true} if headers will be appended if the stream already exists.</li>
174      * <li>if {@code false} and the stream already exists this method returns {@code null}.</li>
175      * </ul>
176      * @param appendToTrailer
177      * <ul>
178      * <li>{@code true} if a message {@code stream} already exists then the headers
179      * should be added to the trailing headers.</li>
180      * <li>{@code false} then appends will be done to the initial headers.</li>
181      * </ul>
182      * @return The object used to track the stream corresponding to {@code stream}. {@code null} if
183      *         {@code allowAppend} is {@code false} and the stream already exists.
184      * @throws Http2Exception If the stream id is not in the correct state to process the headers request
185      */
186     protected FullHttpMessage processHeadersBegin(ChannelHandlerContext ctx, Http2Stream stream, Http2Headers headers,
187                 boolean endOfStream, boolean allowAppend, boolean appendToTrailer) throws Http2Exception {
188         FullHttpMessage msg = getMessage(stream);
189         boolean release = true;
190         if (msg == null) {
191             msg = newMessage(stream, headers, validateHttpHeaders, ctx.alloc());
192         } else if (allowAppend) {
193             release = false;
194             HttpConversionUtil.addHttp2ToHttpHeaders(stream.id(), headers, msg, appendToTrailer);
195         } else {
196             release = false;
197             msg = null;
198         }
199 
200         if (sendDetector.mustSendImmediately(msg)) {
201             // Copy the message (if necessary) before sending. The content is not expected to be copied (or used) in
202             // this operation but just in case it is used do the copy before sending and the resource may be released
203             final FullHttpMessage copy = endOfStream ? null : sendDetector.copyIfNeeded(msg);
204             fireChannelRead(ctx, msg, release, stream);
205             return copy;
206         }
207 
208         return msg;
209     }
210 
211     /**
212      * After HTTP/2 headers have been processed by {@link #processHeadersBegin} this method either
213      * sends the result up the pipeline or retains the message for future processing.
214      *
215      * @param ctx The context for which this message has been received
216      * @param stream The stream the {@code objAccumulator} corresponds to
217      * @param msg The object which represents all headers/data for corresponding to {@code stream}
218      * @param endOfStream {@code true} if this is the last event for the stream
219      */
220     private void processHeadersEnd(ChannelHandlerContext ctx, Http2Stream stream, FullHttpMessage msg,
221                                    boolean endOfStream) {
222         if (endOfStream) {
223             // Release if the msg from the map is different from the object being forwarded up the pipeline.
224             fireChannelRead(ctx, msg, getMessage(stream) != msg, stream);
225         } else {
226             putMessage(stream, msg);
227         }
228     }
229 
230     @Override
231     public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
232                     throws Http2Exception {
233         Http2Stream stream = connection.stream(streamId);
234         FullHttpMessage msg = getMessage(stream);
235         if (msg == null) {
236             throw connectionError(PROTOCOL_ERROR, "Data Frame received for unknown stream id %d", streamId);
237         }
238 
239         ByteBuf content = msg.content();
240         final int dataReadableBytes = data.readableBytes();
241         if (content.readableBytes() > maxContentLength - dataReadableBytes) {
242             throw connectionError(INTERNAL_ERROR,
243                             "Content length exceeded max of %d for stream id %d", maxContentLength, streamId);
244         }
245 
246         content.writeBytes(data, data.readerIndex(), dataReadableBytes);
247 
248         if (endOfStream) {
249             fireChannelRead(ctx, msg, false, stream);
250         }
251 
252         // All bytes have been processed.
253         return dataReadableBytes + padding;
254     }
255 
256     @Override
257     public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
258                     boolean endOfStream) throws Http2Exception {
259         Http2Stream stream = connection.stream(streamId);
260         FullHttpMessage msg = processHeadersBegin(ctx, stream, headers, endOfStream, true, true);
261         if (msg != null) {
262             processHeadersEnd(ctx, stream, msg, endOfStream);
263         }
264     }
265 
266     @Override
267     public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
268                     short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
269         Http2Stream stream = connection.stream(streamId);
270         FullHttpMessage msg = processHeadersBegin(ctx, stream, headers, endOfStream, true, true);
271         if (msg != null) {
272             // Add headers for dependency and weight.
273             // See https://github.com/netty/netty/issues/5866
274             if (streamDependency != Http2CodecUtil.CONNECTION_STREAM_ID) {
275                 msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_DEPENDENCY_ID.text(),
276                         streamDependency);
277             }
278             msg.headers().setShort(HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(), weight);
279 
280             processHeadersEnd(ctx, stream, msg, endOfStream);
281         }
282     }
283 
284     @Override
285     public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
286         Http2Stream stream = connection.stream(streamId);
287         FullHttpMessage msg = getMessage(stream);
288         if (msg != null) {
289             onRstStreamRead(stream, msg);
290         }
291         ctx.fireExceptionCaught(Http2Exception.streamError(streamId, Http2Error.valueOf(errorCode),
292                 "HTTP/2 to HTTP layer caught stream reset"));
293     }
294 
295     @Override
296     public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
297             Http2Headers headers, int padding) throws Http2Exception {
298         // A push promise should not be allowed to add headers to an existing stream
299         Http2Stream promisedStream = connection.stream(promisedStreamId);
300         if (headers.status() == null) {
301             // A PUSH_PROMISE frame has no Http response status.
302             // https://tools.ietf.org/html/rfc7540#section-8.2.1
303             // Server push is semantically equivalent to a server responding to a
304             // request; however, in this case, that request is also sent by the
305             // server, as a PUSH_PROMISE frame.
306             headers.status(OK.codeAsText());
307         }
308         FullHttpMessage msg = processHeadersBegin(ctx, promisedStream, headers, false, false, false);
309         if (msg == null) {
310             throw connectionError(PROTOCOL_ERROR, "Push Promise Frame received for pre-existing stream id %d",
311                             promisedStreamId);
312         }
313 
314         msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_PROMISE_ID.text(), streamId);
315         msg.headers().setShort(HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(),
316                 Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT);
317 
318         processHeadersEnd(ctx, promisedStream, msg, false);
319     }
320 
321     @Override
322     public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
323         if (propagateSettings) {
324             // Provide an interface for non-listeners to capture settings
325             ctx.fireChannelRead(settings);
326         }
327     }
328 
329     /**
330      * Called if a {@code RST_STREAM} is received but we have some data for that stream.
331      */
332     protected void onRstStreamRead(Http2Stream stream, FullHttpMessage msg) {
333         removeMessage(stream, true);
334     }
335 
336     /**
337      * Allows messages to be sent up the pipeline before the next phase in the
338      * HTTP message flow is detected.
339      */
340     private interface ImmediateSendDetector {
341         /**
342          * Determine if the response should be sent immediately, or wait for the end of the stream
343          *
344          * @param msg The response to test
345          * @return {@code true} if the message should be sent immediately
346          *         {@code false) if we should wait for the end of the stream
347          */
348         boolean mustSendImmediately(FullHttpMessage msg);
349 
350         /**
351          * Determine if a copy must be made after an immediate send happens.
352          * <p>
353          * An example of this use case is if a request is received
354          * with a 'Expect: 100-continue' header. The message will be sent immediately,
355          * and the data will be queued and sent at the end of the stream.
356          *
357          * @param msg The message which has just been sent due to {@link #mustSendImmediately(FullHttpMessage)}
358          * @return A modified copy of the {@code msg} or {@code null} if a copy is not needed.
359          */
360         FullHttpMessage copyIfNeeded(FullHttpMessage msg);
361     }
362 }