View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty5.handler.codec.http2;
16  
17  import io.netty5.buffer.api.Buffer;
18  import io.netty5.buffer.api.BufferAllocator;
19  import io.netty5.channel.ChannelHandlerContext;
20  import io.netty5.handler.codec.http.DefaultFullHttpRequest;
21  import io.netty5.handler.codec.http.FullHttpMessage;
22  import io.netty5.handler.codec.http.FullHttpRequest;
23  import io.netty5.handler.codec.http.FullHttpResponse;
24  import io.netty5.handler.codec.http.HttpHeaderNames;
25  import io.netty5.handler.codec.http.HttpStatusClass;
26  import io.netty5.handler.codec.http.HttpUtil;
27  import io.netty5.util.internal.UnstableApi;
28  
29  import static io.netty5.handler.codec.http.HttpResponseStatus.OK;
30  import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
31  import static io.netty5.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
32  import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
33  import static io.netty5.util.internal.ObjectUtil.checkPositive;
34  import static java.util.Objects.requireNonNull;
35  
36  /**
37   * This adapter provides just header/data events from the HTTP message flow defined
38   * in <a href="https://tools.ietf.org/html/rfc7540#section-8.1">[RFC 7540], Section 8.1</a>.
39   * <p>
40   * See {@link HttpToHttp2ConnectionHandler} to get translation from HTTP/1.x objects to HTTP/2 frames for writes.
41   */
42  @UnstableApi
43  public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
44      private static final ImmediateSendDetector DEFAULT_SEND_DETECTOR = new ImmediateSendDetector() {
45          @Override
46          public boolean mustSendImmediately(FullHttpMessage<?> msg) {
47              if (msg instanceof FullHttpResponse) {
48                  return ((FullHttpResponse) msg).status().codeClass() == HttpStatusClass.INFORMATIONAL;
49              }
50              if (msg instanceof FullHttpRequest) {
51                  return msg.headers().contains(HttpHeaderNames.EXPECT);
52              }
53              return false;
54          }
55  
56          @Override
57          public FullHttpMessage<?> copyIfNeeded(BufferAllocator allocator, FullHttpMessage<?> msg) {
58              if (msg instanceof FullHttpRequest) {
59                  final FullHttpRequest original = (FullHttpRequest) msg;
60                  FullHttpRequest copy = new DefaultFullHttpRequest(original.protocolVersion(), original.method(),
61                          original.uri(), allocator.allocate(0), original.headers(), original.trailingHeaders());
62                  copy.headers().remove(HttpHeaderNames.EXPECT);
63                  return copy;
64              }
65              return null;
66          }
67      };
68  
69      private final int maxContentLength;
70      private final ImmediateSendDetector sendDetector;
71      private final Http2Connection.PropertyKey messageKey;
72      private final boolean propagateSettings;
73      protected final Http2Connection connection;
74      protected final boolean validateHttpHeaders;
75  
76      protected InboundHttp2ToHttpAdapter(Http2Connection connection, int maxContentLength,
77                                          boolean validateHttpHeaders, boolean propagateSettings) {
78  
79          requireNonNull(connection, "connection");
80          this.connection = connection;
81          this.maxContentLength = checkPositive(maxContentLength, "maxContentLength");
82          this.validateHttpHeaders = validateHttpHeaders;
83          this.propagateSettings = propagateSettings;
84          sendDetector = DEFAULT_SEND_DETECTOR;
85          messageKey = connection.newKey();
86      }
87  
88      /**
89       * The stream is out of scope for the HTTP message flow and will no longer be tracked
90       * @param stream The stream to remove associated state with
91       * @param release {@code true} to call release on the value if it is present. {@code false} to not call release.
92       */
93      protected final void removeMessage(Http2Stream stream, boolean release) {
94          FullHttpMessage<?> msg = stream.removeProperty(messageKey);
95          if (release && msg != null) {
96              msg.close();
97          }
98      }
99  
100     /**
101      * Get the {@link FullHttpMessage} associated with {@code stream}.
102      * @param stream The stream to get the associated state from
103      * @return The {@link FullHttpMessage} associated with {@code stream}.
104      */
105     protected final FullHttpMessage<?> getMessage(Http2Stream stream) {
106         return stream.getProperty(messageKey);
107     }
108 
109     /**
110      * Make {@code message} be the state associated with {@code stream}.
111      * @param stream The stream which {@code message} is associated with.
112      * @param message The message which contains the HTTP semantics.
113      */
114     protected final void putMessage(Http2Stream stream, FullHttpMessage<?> message) {
115         FullHttpMessage<?> previous = stream.setProperty(messageKey, message);
116         if (previous != message && previous != null) {
117             previous.close();
118         }
119     }
120 
121     @Override
122     public void onStreamRemoved(Http2Stream stream) {
123         removeMessage(stream, true);
124     }
125 
126     /**
127      * Set final headers and fire a channel read event
128      *
129      * @param ctx The context to fire the event on
130      * @param msg The message to send
131      * @param release {@code true} to call release on the value if it is present. {@code false} to not call release.
132      * @param stream the stream of the message which is being fired
133      */
134     protected void fireChannelRead(ChannelHandlerContext ctx, FullHttpMessage<?> msg, boolean release,
135                                    Http2Stream stream) {
136         removeMessage(stream, release);
137         HttpUtil.setContentLength(msg, msg.payload().readableBytes());
138         ctx.fireChannelRead(msg);
139     }
140 
141     /**
142      * Create a new {@link FullHttpMessage} based upon the current connection parameters
143      *
144      * @param stream The stream to create a message for
145      * @param headers The headers associated with {@code stream}
146      * @param validateHttpHeaders
147      * <ul>
148      * <li>{@code true} to validate HTTP headers in the http-codec</li>
149      * <li>{@code false} not to validate HTTP headers in the http-codec</li>
150      * </ul>
151      * @param alloc The {@link BufferAllocator} to use to generate the content of the message
152      * @throws Http2Exception If there is an error when creating {@link FullHttpMessage} from
153      *                        {@link Http2Stream} and {@link Http2Headers}
154      */
155     protected FullHttpMessage<?> newMessage(Http2Stream stream, Http2Headers headers, boolean validateHttpHeaders,
156                                             BufferAllocator alloc) throws Http2Exception {
157         return connection.isServer() ? HttpConversionUtil.toFullHttpRequest(stream.id(), headers, alloc,
158                 validateHttpHeaders) : HttpConversionUtil.toFullHttpResponse(stream.id(), headers, alloc,
159                 validateHttpHeaders);
160     }
161 
162     /**
163      * Provides translation between HTTP/2 and HTTP header objects while ensuring the stream
164      * is in a valid state for additional headers.
165      *
166      * @param ctx The context for which this message has been received.
167      * Used to send informational header if detected.
168      * @param stream The stream the {@code headers} apply to
169      * @param headers The headers to process
170      * @param endOfStream {@code true} if the {@code stream} has received the end of stream flag
171      * @param allowAppend
172      * <ul>
173      * <li>{@code true} if headers will be appended if the stream already exists.</li>
174      * <li>if {@code false} and the stream already exists this method returns {@code null}.</li>
175      * </ul>
176      * @param appendToTrailer
177      * <ul>
178      * <li>{@code true} if a message {@code stream} already exists then the headers
179      * should be added to the trailing headers.</li>
180      * <li>{@code false} then appends will be done to the initial headers.</li>
181      * </ul>
182      * @return The object used to track the stream corresponding to {@code stream}. {@code null} if
183      *         {@code allowAppend} is {@code false} and the stream already exists.
184      * @throws Http2Exception If the stream id is not in the correct state to process the headers request
185      */
186     protected FullHttpMessage<?> processHeadersBegin(ChannelHandlerContext ctx, Http2Stream stream,
187                                                      Http2Headers headers, boolean endOfStream, boolean allowAppend,
188                                                      boolean appendToTrailer)
189             throws Http2Exception {
190         FullHttpMessage<?> msg = getMessage(stream);
191         boolean release = true;
192         if (msg == null) {
193             msg = newMessage(stream, headers, validateHttpHeaders, ctx.bufferAllocator());
194         } else if (allowAppend) {
195             release = false;
196             HttpConversionUtil.addHttp2ToHttpHeaders(stream.id(), headers, msg, appendToTrailer);
197         } else {
198             release = false;
199             msg = null;
200         }
201 
202         if (sendDetector.mustSendImmediately(msg)) {
203             // Copy the message (if necessary) before sending. The content is not expected to be copied (or used) in
204             // this operation but just in case it is used do the copy before sending and the resource may be released
205             final FullHttpMessage<?> copy = endOfStream ? null : sendDetector.copyIfNeeded(ctx.bufferAllocator(), msg);
206             fireChannelRead(ctx, msg, release, stream);
207             return copy;
208         }
209 
210         return msg;
211     }
212 
213     /**
214      * After HTTP/2 headers have been processed by {@link #processHeadersBegin} this method either
215      * sends the result up the pipeline or retains the message for future processing.
216      *
217      * @param ctx The context for which this message has been received
218      * @param stream The stream the {@code objAccumulator} corresponds to
219      * @param msg The object which represents all headers/data for corresponding to {@code stream}
220      * @param endOfStream {@code true} if this is the last event for the stream
221      */
222     private void processHeadersEnd(ChannelHandlerContext ctx, Http2Stream stream, FullHttpMessage<?> msg,
223                                    boolean endOfStream) {
224         if (endOfStream) {
225             // Release if the msg from the map is different from the object being forwarded up the pipeline.
226             fireChannelRead(ctx, msg, getMessage(stream) != msg, stream);
227         } else {
228             putMessage(stream, msg);
229         }
230     }
231 
232     @Override
233     public int onDataRead(ChannelHandlerContext ctx, int streamId, Buffer data, int padding, boolean endOfStream)
234             throws Http2Exception {
235         Http2Stream stream = connection.stream(streamId);
236         FullHttpMessage<?> msg = getMessage(stream);
237         if (msg == null) {
238             throw connectionError(PROTOCOL_ERROR, "Data Frame received for unknown stream id %d", streamId);
239         }
240 
241         Buffer content = msg.payload();
242         final int dataReadableBytes = data.readableBytes();
243         if (content.readableBytes() > maxContentLength - dataReadableBytes) {
244             throw connectionError(INTERNAL_ERROR,
245                     "Content length exceeded max of %d for stream id %d", maxContentLength, streamId);
246         }
247 
248         content.ensureWritable(dataReadableBytes);
249         content.writeBytes(data);
250 
251         if (endOfStream) {
252             fireChannelRead(ctx, msg, false, stream);
253         }
254 
255         // All bytes have been processed.
256         return dataReadableBytes + padding;
257     }
258 
259     @Override
260     public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
261                               boolean endOfStream) throws Http2Exception {
262         Http2Stream stream = connection.stream(streamId);
263         FullHttpMessage<?> msg = processHeadersBegin(ctx, stream, headers, endOfStream, true, true);
264         if (msg != null) {
265             processHeadersEnd(ctx, stream, msg, endOfStream);
266         }
267     }
268 
269     @Override
270     public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
271                               short weight, boolean exclusive, int padding, boolean endOfStream)
272             throws Http2Exception {
273         Http2Stream stream = connection.stream(streamId);
274         FullHttpMessage<?> msg = processHeadersBegin(ctx, stream, headers, endOfStream, true, true);
275         if (msg != null) {
276             // Add headers for dependency and weight.
277             // See https://github.com/netty/netty/issues/5866
278             if (streamDependency != Http2CodecUtil.CONNECTION_STREAM_ID) {
279                 msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_DEPENDENCY_ID.text(),
280                         streamDependency);
281             }
282             msg.headers().setShort(HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(), weight);
283 
284             processHeadersEnd(ctx, stream, msg, endOfStream);
285         }
286     }
287 
288     @Override
289     public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
290         Http2Stream stream = connection.stream(streamId);
291         FullHttpMessage<?> msg = getMessage(stream);
292         if (msg != null) {
293             onRstStreamRead(stream, msg);
294         }
295         ctx.fireChannelExceptionCaught(Http2Exception.streamError(streamId, Http2Error.valueOf(errorCode),
296                 "HTTP/2 to HTTP layer caught stream reset"));
297     }
298 
299     @Override
300     public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
301                                   Http2Headers headers, int padding) throws Http2Exception {
302         // A push promise should not be allowed to add headers to an existing stream
303         Http2Stream promisedStream = connection.stream(promisedStreamId);
304         if (headers.status() == null) {
305             // A PUSH_PROMISE frame has no Http response status.
306             // https://tools.ietf.org/html/rfc7540#section-8.2.1
307             // Server push is semantically equivalent to a server responding to a
308             // request; however, in this case, that request is also sent by the
309             // server, as a PUSH_PROMISE frame.
310             headers.status(OK.codeAsText());
311         }
312         FullHttpMessage<?> msg = processHeadersBegin(ctx, promisedStream, headers, false, false, false);
313         if (msg == null) {
314             throw connectionError(PROTOCOL_ERROR, "Push Promise Frame received for pre-existing stream id %d",
315                     promisedStreamId);
316         }
317 
318         msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_PROMISE_ID.text(), streamId);
319         msg.headers().setShort(HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(),
320                 Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT);
321 
322         processHeadersEnd(ctx, promisedStream, msg, false);
323     }
324 
325     @Override
326     public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
327         if (propagateSettings) {
328             // Provide an interface for non-listeners to capture settings
329             ctx.fireChannelRead(settings);
330         }
331     }
332 
333     /**
334      * Called if a {@code RST_STREAM} is received but we have some data for that stream.
335      */
336     protected void onRstStreamRead(Http2Stream stream, FullHttpMessage<?> msg) {
337         removeMessage(stream, true);
338     }
339 
340     /**
341      * Allows messages to be sent up the pipeline before the next phase in the
342      * HTTP message flow is detected.
343      */
344     private interface ImmediateSendDetector {
345         /**
346          * Determine if the response should be sent immediately, or wait for the end of the stream
347          *
348          * @param msg The response to test
349          * @return {@code true} if the message should be sent immediately
350          *         {@code false) if we should wait for the end of the stream
351          */
352         boolean mustSendImmediately(FullHttpMessage<?> msg);
353 
354         /**
355          * Determine if a copy must be made after an immediate send happens.
356          * <p>
357          * An example of this use case is if a request is received
358          * with a 'Expect: 100-continue' header. The message will be sent immediately,
359          * and the data will be queued and sent at the end of the stream.
360          *
361          * @param allocator The {@link BufferAllocator} that can be used to allocate
362          * @param msg The message which has just been sent due to {@link #mustSendImmediately(FullHttpMessage)}
363          * @return A modified copy of the {@code msg} or {@code null} if a copy is not needed.
364          */
365         FullHttpMessage<?> copyIfNeeded(BufferAllocator allocator, FullHttpMessage<?> msg);
366     }
367 }