View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.http.FullHttpMessage;
21  import io.netty.handler.codec.http.FullHttpRequest;
22  import io.netty.handler.codec.http.FullHttpResponse;
23  import io.netty.handler.codec.http.HttpHeaderNames;
24  import io.netty.handler.codec.http.HttpStatusClass;
25  import io.netty.handler.codec.http.HttpUtil;
26  
27  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
28  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
29  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
30  import static io.netty.handler.codec.http.HttpResponseStatus.OK;
31  import static io.netty.util.internal.ObjectUtil.checkNotNull;
32  import static io.netty.util.internal.ObjectUtil.checkPositive;
33  
34  /**
35   * This adapter provides just header/data events from the HTTP message flow defined
36   * in <a href="https://tools.ietf.org/html/rfc7540#section-8.1">[RFC 7540], Section 8.1</a>.
37   * <p>
38   * See {@link HttpToHttp2ConnectionHandler} to get translation from HTTP/1.x objects to HTTP/2 frames for writes.
39   */
40  public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
41      private static final ImmediateSendDetector DEFAULT_SEND_DETECTOR = new ImmediateSendDetector() {
42          @Override
43          public boolean mustSendImmediately(FullHttpMessage msg) {
44              if (msg instanceof FullHttpResponse) {
45                  return ((FullHttpResponse) msg).status().codeClass() == HttpStatusClass.INFORMATIONAL;
46              }
47              if (msg instanceof FullHttpRequest) {
48                  return msg.headers().contains(HttpHeaderNames.EXPECT);
49              }
50              return false;
51          }
52  
53          @Override
54          public FullHttpMessage copyIfNeeded(ByteBufAllocator allocator, FullHttpMessage msg) {
55              if (msg instanceof FullHttpRequest) {
56                  FullHttpRequest copy = ((FullHttpRequest) msg).replace(allocator.buffer(0));
57                  copy.headers().remove(HttpHeaderNames.EXPECT);
58                  return copy;
59              }
60              return null;
61          }
62      };
63  
64      private final int maxContentLength;
65      private final ImmediateSendDetector sendDetector;
66      private final Http2Connection.PropertyKey messageKey;
67      private final boolean propagateSettings;
68      protected final Http2Connection connection;
69      protected final boolean validateHttpHeaders;
70  
71      protected InboundHttp2ToHttpAdapter(Http2Connection connection, int maxContentLength,
72                                          boolean validateHttpHeaders, boolean propagateSettings) {
73          this.connection = checkNotNull(connection, "connection");
74          this.maxContentLength = checkPositive(maxContentLength, "maxContentLength");
75          this.validateHttpHeaders = validateHttpHeaders;
76          this.propagateSettings = propagateSettings;
77          sendDetector = DEFAULT_SEND_DETECTOR;
78          messageKey = connection.newKey();
79      }
80  
81      /**
82       * The stream is out of scope for the HTTP message flow and will no longer be tracked
83       * @param stream The stream to remove associated state with
84       * @param release {@code true} to call release on the value if it is present. {@code false} to not call release.
85       */
86      protected final void removeMessage(Http2Stream stream, boolean release) {
87          FullHttpMessage msg = stream.removeProperty(messageKey);
88          if (release && msg != null) {
89              msg.release();
90          }
91      }
92  
93      /**
94       * Get the {@link FullHttpMessage} associated with {@code stream}.
95       * @param stream The stream to get the associated state from
96       * @return The {@link FullHttpMessage} associated with {@code stream}.
97       */
98      protected final FullHttpMessage getMessage(Http2Stream stream) {
99          return (FullHttpMessage) stream.getProperty(messageKey);
100     }
101 
102     /**
103      * Make {@code message} be the state associated with {@code stream}.
104      * @param stream The stream which {@code message} is associated with.
105      * @param message The message which contains the HTTP semantics.
106      */
107     protected final void putMessage(Http2Stream stream, FullHttpMessage message) {
108         FullHttpMessage previous = stream.setProperty(messageKey, message);
109         if (previous != message && previous != null) {
110             previous.release();
111         }
112     }
113 
114     @Override
115     public void onStreamRemoved(Http2Stream stream) {
116         removeMessage(stream, true);
117     }
118 
119     /**
120      * Set final headers and fire a channel read event
121      *
122      * @param ctx The context to fire the event on
123      * @param msg The message to send
124      * @param release {@code true} to call release on the value if it is present. {@code false} to not call release.
125      * @param stream the stream of the message which is being fired
126      */
127     protected void fireChannelRead(ChannelHandlerContext ctx, FullHttpMessage msg, boolean release,
128                                    Http2Stream stream) {
129         removeMessage(stream, release);
130         HttpUtil.setContentLength(msg, msg.content().readableBytes());
131         ctx.fireChannelRead(msg);
132     }
133 
134     /**
135      * Create a new {@link FullHttpMessage} based upon the current connection parameters
136      *
137      * @param stream The stream to create a message for
138      * @param headers The headers associated with {@code stream}
139      * @param validateHttpHeaders
140      * <ul>
141      * <li>{@code true} to validate HTTP headers in the http-codec</li>
142      * <li>{@code false} not to validate HTTP headers in the http-codec</li>
143      * </ul>
144      * @param alloc The {@link ByteBufAllocator} to use to generate the content of the message
145      * @throws Http2Exception If there is an error when creating {@link FullHttpMessage} from
146      *                        {@link Http2Stream} and {@link Http2Headers}
147      */
148     protected FullHttpMessage newMessage(Http2Stream stream, Http2Headers headers, boolean validateHttpHeaders,
149                                          ByteBufAllocator alloc) throws Http2Exception {
150         return connection.isServer() ? HttpConversionUtil.toFullHttpRequest(stream.id(), headers, alloc,
151                 validateHttpHeaders) : HttpConversionUtil.toFullHttpResponse(stream.id(), headers, alloc,
152                 validateHttpHeaders);
153     }
154 
155     /**
156      * Provides translation between HTTP/2 and HTTP header objects while ensuring the stream
157      * is in a valid state for additional headers.
158      *
159      * @param ctx The context for which this message has been received.
160      * Used to send informational header if detected.
161      * @param stream The stream the {@code headers} apply to
162      * @param headers The headers to process
163      * @param endOfStream {@code true} if the {@code stream} has received the end of stream flag
164      * @param allowAppend
165      * <ul>
166      * <li>{@code true} if headers will be appended if the stream already exists.</li>
167      * <li>if {@code false} and the stream already exists this method returns {@code null}.</li>
168      * </ul>
169      * @param appendToTrailer
170      * <ul>
171      * <li>{@code true} if a message {@code stream} already exists then the headers
172      * should be added to the trailing headers.</li>
173      * <li>{@code false} then appends will be done to the initial headers.</li>
174      * </ul>
175      * @return The object used to track the stream corresponding to {@code stream}. {@code null} if
176      *         {@code allowAppend} is {@code false} and the stream already exists.
177      * @throws Http2Exception If the stream id is not in the correct state to process the headers request
178      */
179     protected FullHttpMessage processHeadersBegin(ChannelHandlerContext ctx, Http2Stream stream, Http2Headers headers,
180                                                   boolean endOfStream, boolean allowAppend, boolean appendToTrailer)
181             throws Http2Exception {
182         FullHttpMessage msg = getMessage(stream);
183         boolean release = true;
184         if (msg == null) {
185             msg = newMessage(stream, headers, validateHttpHeaders, ctx.alloc());
186         } else if (allowAppend) {
187             release = false;
188             HttpConversionUtil.addHttp2ToHttpHeaders(stream.id(), headers, msg, appendToTrailer);
189         } else {
190             release = false;
191             msg = null;
192         }
193 
194         if (sendDetector.mustSendImmediately(msg)) {
195             // Copy the message (if necessary) before sending. The content is not expected to be copied (or used) in
196             // this operation but just in case it is used do the copy before sending and the resource may be released
197             final FullHttpMessage copy = endOfStream ? null : sendDetector.copyIfNeeded(ctx.alloc(), msg);
198             fireChannelRead(ctx, msg, release, stream);
199             return copy;
200         }
201 
202         return msg;
203     }
204 
205     /**
206      * After HTTP/2 headers have been processed by {@link #processHeadersBegin} this method either
207      * sends the result up the pipeline or retains the message for future processing.
208      *
209      * @param ctx The context for which this message has been received
210      * @param stream The stream the {@code objAccumulator} corresponds to
211      * @param msg The object which represents all headers/data for corresponding to {@code stream}
212      * @param endOfStream {@code true} if this is the last event for the stream
213      */
214     private void processHeadersEnd(ChannelHandlerContext ctx, Http2Stream stream, FullHttpMessage msg,
215                                    boolean endOfStream) {
216         if (endOfStream) {
217             // Release if the msg from the map is different from the object being forwarded up the pipeline.
218             fireChannelRead(ctx, msg, getMessage(stream) != msg, stream);
219         } else {
220             putMessage(stream, msg);
221         }
222     }
223 
224     @Override
225     public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
226             throws Http2Exception {
227         Http2Stream stream = connection.stream(streamId);
228         FullHttpMessage msg = getMessage(stream);
229         if (msg == null) {
230             throw connectionError(PROTOCOL_ERROR, "Data Frame received for unknown stream id %d", streamId);
231         }
232 
233         ByteBuf content = msg.content();
234         final int dataReadableBytes = data.readableBytes();
235         if (content.readableBytes() > maxContentLength - dataReadableBytes) {
236             throw connectionError(INTERNAL_ERROR,
237                     "Content length exceeded max of %d for stream id %d", maxContentLength, streamId);
238         }
239 
240         content.writeBytes(data, data.readerIndex(), dataReadableBytes);
241 
242         if (endOfStream) {
243             fireChannelRead(ctx, msg, false, stream);
244         }
245 
246         // All bytes have been processed.
247         return dataReadableBytes + padding;
248     }
249 
250     @Override
251     public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
252                               boolean endOfStream) throws Http2Exception {
253         Http2Stream stream = connection.stream(streamId);
254         FullHttpMessage msg = processHeadersBegin(ctx, stream, headers, endOfStream, true, true);
255         if (msg != null) {
256             processHeadersEnd(ctx, stream, msg, endOfStream);
257         }
258     }
259 
260     @Override
261     public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
262                               short weight, boolean exclusive, int padding, boolean endOfStream)
263             throws Http2Exception {
264         Http2Stream stream = connection.stream(streamId);
265         FullHttpMessage msg = processHeadersBegin(ctx, stream, headers, endOfStream, true, true);
266         if (msg != null) {
267             // Add headers for dependency and weight.
268             // See https://github.com/netty/netty/issues/5866
269             if (streamDependency != Http2CodecUtil.CONNECTION_STREAM_ID) {
270                 msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_DEPENDENCY_ID.text(),
271                         streamDependency);
272             }
273             msg.headers().setShort(HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(), weight);
274 
275             processHeadersEnd(ctx, stream, msg, endOfStream);
276         }
277     }
278 
279     @Override
280     public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
281         Http2Stream stream = connection.stream(streamId);
282         FullHttpMessage msg = getMessage(stream);
283         if (msg != null) {
284             onRstStreamRead(stream, msg);
285         }
286         ctx.fireExceptionCaught(Http2Exception.streamError(streamId, Http2Error.valueOf(errorCode),
287                 "HTTP/2 to HTTP layer caught stream reset"));
288     }
289 
290     @Override
291     public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
292                                   Http2Headers headers, int padding) throws Http2Exception {
293         // A push promise should not be allowed to add headers to an existing stream
294         Http2Stream promisedStream = connection.stream(promisedStreamId);
295         if (headers.status() == null) {
296             // A PUSH_PROMISE frame has no Http response status.
297             // https://tools.ietf.org/html/rfc7540#section-8.2.1
298             // Server push is semantically equivalent to a server responding to a
299             // request; however, in this case, that request is also sent by the
300             // server, as a PUSH_PROMISE frame.
301             headers.status(OK.codeAsText());
302         }
303         FullHttpMessage msg = processHeadersBegin(ctx, promisedStream, headers, false, false, false);
304         if (msg == null) {
305             throw connectionError(PROTOCOL_ERROR, "Push Promise Frame received for pre-existing stream id %d",
306                     promisedStreamId);
307         }
308 
309         msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_PROMISE_ID.text(), streamId);
310         msg.headers().setShort(HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(),
311                 Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT);
312 
313         processHeadersEnd(ctx, promisedStream, msg, false);
314     }
315 
316     @Override
317     public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
318         if (propagateSettings) {
319             // Provide an interface for non-listeners to capture settings
320             ctx.fireChannelRead(settings);
321         }
322     }
323 
324     /**
325      * Called if a {@code RST_STREAM} is received but we have some data for that stream.
326      */
327     protected void onRstStreamRead(Http2Stream stream, FullHttpMessage msg) {
328         removeMessage(stream, true);
329     }
330 
331     /**
332      * Allows messages to be sent up the pipeline before the next phase in the
333      * HTTP message flow is detected.
334      */
335     private interface ImmediateSendDetector {
336         /**
337          * Determine if the response should be sent immediately, or wait for the end of the stream
338          *
339          * @param msg The response to test
340          * @return {@code true} if the message should be sent immediately
341          *         {@code false) if we should wait for the end of the stream
342          */
343         boolean mustSendImmediately(FullHttpMessage msg);
344 
345         /**
346          * Determine if a copy must be made after an immediate send happens.
347          * <p>
348          * An example of this use case is if a request is received
349          * with a 'Expect: 100-continue' header. The message will be sent immediately,
350          * and the data will be queued and sent at the end of the stream.
351          *
352          * @param allocator The {@link ByteBufAllocator} that can be used to allocate
353          * @param msg The message which has just been sent due to {@link #mustSendImmediately(FullHttpMessage)}
354          * @return A modified copy of the {@code msg} or {@code null} if a copy is not needed.
355          */
356         FullHttpMessage copyIfNeeded(ByteBufAllocator allocator, FullHttpMessage msg);
357     }
358 }