View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.http;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  import io.netty5.buffer.api.CompositeBuffer;
21  import io.netty5.util.Send;
22  import io.netty5.channel.ChannelHandler;
23  import io.netty5.channel.ChannelHandlerContext;
24  import io.netty5.channel.ChannelPipeline;
25  import io.netty5.handler.codec.DecoderResult;
26  import io.netty5.handler.codec.MessageAggregator;
27  import io.netty5.util.concurrent.Future;
28  import io.netty5.util.internal.logging.InternalLogger;
29  import io.netty5.util.internal.logging.InternalLoggerFactory;
30  
31  import static io.netty5.handler.codec.http.HttpHeaderNames.CONNECTION;
32  import static io.netty5.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
33  import static io.netty5.handler.codec.http.HttpHeaderNames.EXPECT;
34  import static io.netty5.handler.codec.http.HttpResponseStatus.CONTINUE;
35  import static io.netty5.handler.codec.http.HttpResponseStatus.EXPECTATION_FAILED;
36  import static io.netty5.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
37  import static io.netty5.handler.codec.http.HttpUtil.getContentLength;
38  import static io.netty5.handler.codec.http.HttpVersion.HTTP_1_1;
39  import static java.util.Objects.requireNonNullElse;
40  
41  /**
42   * A {@link ChannelHandler} that aggregates an {@link HttpMessage}
43   * and its following {@link HttpContent}s into a single {@link FullHttpRequest}
44   * or {@link FullHttpResponse} (depending on if it used to handle requests or responses)
45   * with no following {@link HttpContent}s.  It is useful when you don't want to take
46   * care of HTTP messages whose transfer encoding is 'chunked'.  Insert this
47   * handler after {@link HttpResponseDecoder} in the {@link ChannelPipeline} if being used to handle
48   * responses, or after {@link HttpRequestDecoder} and {@link HttpResponseEncoder} in the
49   * {@link ChannelPipeline} if being used to handle requests.
50   * <blockquote>
51   * <pre>
52   *  {@link ChannelPipeline} p = ...;
53   *  ...
54   *  p.addLast("decoder", <b>new {@link HttpRequestDecoder}()</b>);
55   *  p.addLast("encoder", <b>new {@link HttpResponseEncoder}()</b>);
56   *  p.addLast("aggregator", <b>new {@link HttpObjectAggregator}(1048576)</b>);
57   *  ...
58   *  p.addLast("handler", new HttpRequestHandler());
59   *  </pre>
60   * </blockquote>
61   * <p>
62   * For convenience, consider putting a {@link HttpServerCodec} before the {@link HttpObjectAggregator}
63   * as it functions as both a {@link HttpRequestDecoder} and a {@link HttpResponseEncoder}.
64   * </p>
65   * Be aware that {@link HttpObjectAggregator} may end up sending a {@link HttpResponse}:
66   * <table border summary="Possible Responses">
67   * <tbody>
68   * <tr>
69   * <th>Response Status</th>
70   * <th>Condition When Sent</th>
71   * </tr>
72   * <tr>
73   * <td>100 Continue</td>
74   * <td>A '100-continue' expectation is received and the 'content-length' doesn't exceed maxContentLength</td>
75   * </tr>
76   * <tr>
77   * <td>417 Expectation Failed</td>
78   * <td>A '100-continue' expectation is received and the 'content-length' exceeds maxContentLength</td>
79   * </tr>
80   * <tr>
81   * <td>413 Request Entity Too Large</td>
82   * <td>Either the 'content-length' or the bytes received so far exceed maxContentLength</td>
83   * </tr>
84   * </tbody>
85   * </table>
86   *
87   * @see FullHttpRequest
88   * @see FullHttpResponse
89   * @see HttpResponseDecoder
90   * @see HttpServerCodec
91   */
92  public class HttpObjectAggregator<C extends HttpContent<C>>
93          extends MessageAggregator<HttpObject, HttpMessage, HttpContent<C>, FullHttpMessage<?>> {
94      private static final InternalLogger logger = InternalLoggerFactory.getInstance(HttpObjectAggregator.class);
95  
96      private final boolean closeOnExpectationFailed;
97  
98      /**
99       * Creates a new instance.
100      *
101      * @param maxContentLength the maximum length of the aggregated content in bytes.
102      * If the length of the aggregated content exceeds this value,
103      * {@link #handleOversizedMessage(ChannelHandlerContext, Object)} will be called.
104      */
105     public HttpObjectAggregator(int maxContentLength) {
106         this(maxContentLength, false);
107     }
108 
109     /**
110      * Creates a new instance.
111      *
112      * @param maxContentLength the maximum length of the aggregated content in bytes.
113      * If the length of the aggregated content exceeds this value,
114      * {@link #handleOversizedMessage(ChannelHandlerContext, Object)} will be called.
115      * @param closeOnExpectationFailed If a 100-continue response is detected but the content length is too large
116      * then {@code true} means close the connection. otherwise the connection will remain open and data will be
117      * consumed and discarded until the next request is received.
118      */
119     public HttpObjectAggregator(int maxContentLength, boolean closeOnExpectationFailed) {
120         super(maxContentLength);
121         this.closeOnExpectationFailed = closeOnExpectationFailed;
122     }
123 
124     @Override
125     protected HttpMessage tryStartMessage(Object msg) {
126         return msg instanceof HttpMessage ? (HttpMessage) msg : null;
127     }
128 
129     @SuppressWarnings("unchecked")
130     @Override
131     protected HttpContent<C> tryContentMessage(Object msg) {
132         return msg instanceof HttpContent ? (HttpContent<C>) msg : null;
133     }
134 
135     @Override
136     protected boolean isAggregated(Object msg) throws Exception {
137         return msg instanceof FullHttpMessage;
138     }
139 
140     @Override
141     protected int lengthForContent(HttpContent<C> msg) {
142         return msg.payload().readableBytes();
143     }
144 
145     @Override
146     protected int lengthForAggregation(FullHttpMessage<?> msg) {
147         return msg.payload().readableBytes();
148     }
149 
150     @Override
151     protected boolean isLastContentMessage(HttpContent<C> msg) throws Exception {
152         return msg instanceof LastHttpContent;
153     }
154 
155     @Override
156     protected boolean isContentLengthInvalid(HttpMessage start, int maxContentLength) {
157         try {
158             return getContentLength(start, -1L) > maxContentLength;
159         } catch (final NumberFormatException e) {
160             return false;
161         }
162     }
163 
164     private static FullHttpResponse continueResponse(HttpMessage start, int maxContentLength,
165                                                      ChannelPipeline pipeline) {
166         if (HttpUtil.isUnsupportedExpectation(start)) {
167             // if the request contains an unsupported expectation, we return 417
168             pipeline.fireChannelInboundEvent(HttpExpectationFailedEvent.INSTANCE);
169             return newErrorResponse(EXPECTATION_FAILED, pipeline.channel().bufferAllocator(), true, false);
170         } else if (HttpUtil.is100ContinueExpected(start)) {
171             // if the request contains 100-continue but the content-length is too large, we return 413
172             if (getContentLength(start, -1L) <= maxContentLength) {
173                 return newErrorResponse(CONTINUE, pipeline.channel().bufferAllocator(), false, false);
174             }
175             pipeline.fireChannelInboundEvent(HttpExpectationFailedEvent.INSTANCE);
176             return newErrorResponse(REQUEST_ENTITY_TOO_LARGE, pipeline.channel().bufferAllocator(), true, false);
177         }
178 
179         return null;
180     }
181 
182     @Override
183     protected Object newContinueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {
184         FullHttpResponse response = continueResponse(start, maxContentLength, pipeline);
185         // we're going to respond based on the request expectation so there's no
186         // need to propagate the expectation further.
187         if (response != null) {
188             start.headers().remove(EXPECT);
189         }
190         return response;
191     }
192 
193     @Override
194     protected boolean closeAfterContinueResponse(Object msg) {
195         return closeOnExpectationFailed && ignoreContentAfterContinueResponse(msg);
196     }
197 
198     @Override
199     protected boolean ignoreContentAfterContinueResponse(Object msg) {
200         if (msg instanceof HttpResponse) {
201             final HttpResponse httpResponse = (HttpResponse) msg;
202             return httpResponse.status().codeClass().equals(HttpStatusClass.CLIENT_ERROR);
203         }
204         return false;
205     }
206 
207     @Override
208     protected FullHttpMessage<?> beginAggregation(BufferAllocator allocator, HttpMessage start) throws Exception {
209         assert !(start instanceof FullHttpMessage);
210 
211         HttpUtil.setTransferEncodingChunked(start, false);
212 
213         final CompositeBuffer content = allocator.compose();
214         FullHttpMessage<?> ret;
215         if (start instanceof HttpRequest) {
216             ret = new AggregatedFullHttpRequest((HttpRequest) start, content, null);
217         } else if (start instanceof HttpResponse) {
218             ret = new AggregatedFullHttpResponse((HttpResponse) start, content, null);
219         } else {
220             throw new Error();
221         }
222         return ret;
223     }
224 
225     @Override
226     protected void aggregate(BufferAllocator allocator, FullHttpMessage<?> aggregated,
227                              HttpContent<C> content) throws Exception {
228         final CompositeBuffer payload = (CompositeBuffer) aggregated.payload();
229         payload.extendWith(content.payload().send());
230         if (content instanceof LastHttpContent) {
231             // Merge trailing headers into the message.
232             ((AggregatedFullHttpMessage<?>) aggregated).setTrailingHeaders(((LastHttpContent<?>) content)
233                     .trailingHeaders());
234         }
235     }
236 
237     @Override
238     protected void finishAggregation(BufferAllocator allocator, FullHttpMessage<?> aggregated) throws Exception {
239         // Set the 'Content-Length' header. If one isn't already set.
240         // This is important as HEAD responses will use a 'Content-Length' header which
241         // does not match the actual body, but the number of bytes that would be
242         // transmitted if a GET would have been used.
243         //
244         // See rfc2616 14.13 Content-Length
245         if (!HttpUtil.isContentLengthSet(aggregated)) {
246             aggregated.headers()
247                     .set(CONTENT_LENGTH, String.valueOf(aggregated.payload().readableBytes()));
248         }
249     }
250 
251     @Override
252     protected void handleOversizedMessage(final ChannelHandlerContext ctx, Object oversized) throws Exception {
253         if (oversized instanceof HttpRequest) {
254             HttpRequest request = (HttpRequest) oversized;
255             // send back a 413 and close the connection
256 
257             // If the client started to send data already, close because it's impossible to recover.
258             // If keep-alive is off and 'Expect: 100-continue' is missing, no need to leave the connection open.
259             if (oversized instanceof FullHttpMessage ||
260                     !HttpUtil.is100ContinueExpected(request) && !HttpUtil.isKeepAlive(request)) {
261                 Future<Void> future = ctx.writeAndFlush(newErrorResponse(REQUEST_ENTITY_TOO_LARGE,
262                         ctx.bufferAllocator(), true, true));
263                 future.addListener(f -> {
264                     if (f.isFailed()) {
265                         logger.debug("Failed to send a 413 Request Entity Too Large.", f.cause());
266                     }
267                     ctx.close();
268                 });
269             } else {
270                 ctx.writeAndFlush(newErrorResponse(REQUEST_ENTITY_TOO_LARGE,
271                                 ctx.bufferAllocator(), true, false))
272                         .addListener(future -> {
273                             if (future.isFailed()) {
274                                 logger.debug("Failed to send a 413 Request Entity Too Large.", future.cause());
275                                 ctx.close();
276                             }
277                         });
278             }
279         } else if (oversized instanceof HttpResponse) {
280             throw new ResponseTooLargeException("Response entity too large: " + oversized);
281         } else {
282             throw new IllegalStateException();
283         }
284     }
285 
286     @Override
287     public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
288         super.channelExceptionCaught(ctx, cause);
289         if (cause instanceof ResponseTooLargeException) {
290             ctx.close();
291         }
292     }
293 
294     private static FullHttpResponse newErrorResponse(HttpResponseStatus status, BufferAllocator allocator,
295                                                      boolean emptyContent, boolean closeConnection) {
296         FullHttpResponse resp = new DefaultFullHttpResponse(HTTP_1_1, status, allocator.allocate(0));
297         if (emptyContent) {
298             resp.headers().set(CONTENT_LENGTH, 0);
299         }
300         if (closeConnection) {
301             resp.headers().set(CONNECTION, HttpHeaderValues.CLOSE);
302         }
303         return resp;
304     }
305 
306     private static final class ResponseTooLargeException extends TooLongHttpContentException {
307         ResponseTooLargeException(String message) {
308             super(message);
309         }
310     }
311 
312     private abstract static class AggregatedFullHttpMessage<R extends FullHttpMessage<R>>
313             implements FullHttpMessage<R> {
314         protected final HttpMessage message;
315         private final Buffer payload;
316         private HttpHeaders trailingHeaders;
317 
318         AggregatedFullHttpMessage(HttpMessage message, Buffer payload, HttpHeaders trailingHeaders) {
319             this.message = message;
320             this.payload = payload;
321             this.trailingHeaders = trailingHeaders;
322         }
323 
324         @Override
325         public void close() {
326             payload.close();
327         }
328 
329         @Override
330         public boolean isAccessible() {
331             return payload.isAccessible();
332         }
333 
334         @Override
335         public Buffer payload() {
336             return payload;
337         }
338 
339         @Override
340         public HttpHeaders trailingHeaders() {
341             HttpHeaders trailingHeaders = this.trailingHeaders;
342             return requireNonNullElse(trailingHeaders, EmptyHttpHeaders.INSTANCE);
343         }
344 
345         void setTrailingHeaders(HttpHeaders trailingHeaders) {
346             this.trailingHeaders = trailingHeaders;
347         }
348 
349         @Override
350         public HttpVersion getProtocolVersion() {
351             return message.protocolVersion();
352         }
353 
354         @Override
355         public HttpVersion protocolVersion() {
356             return message.protocolVersion();
357         }
358 
359         @Override
360         public FullHttpMessage<R> setProtocolVersion(HttpVersion version) {
361             message.setProtocolVersion(version);
362             return this;
363         }
364 
365         @Override
366         public HttpHeaders headers() {
367             return message.headers();
368         }
369 
370         @Override
371         public DecoderResult decoderResult() {
372             return message.decoderResult();
373         }
374 
375         @Override
376         public void setDecoderResult(DecoderResult result) {
377             message.setDecoderResult(result);
378         }
379     }
380 
381     private static final class AggregatedFullHttpRequest extends AggregatedFullHttpMessage<FullHttpRequest>
382             implements FullHttpRequest {
383 
384         AggregatedFullHttpRequest(HttpRequest request, Buffer content, HttpHeaders trailingHeaders) {
385             super(request, content, trailingHeaders);
386         }
387 
388         @Override
389         public Send<FullHttpRequest> send() {
390             return payload().send().map(FullHttpRequest.class,
391                     p -> new AggregatedFullHttpRequest(this, p, trailingHeaders()));
392         }
393 
394         @Override
395         public AggregatedFullHttpRequest copy() {
396             return new AggregatedFullHttpRequest(this, payload().copy(), trailingHeaders().copy());
397         }
398 
399         @Override
400         public FullHttpRequest touch(Object hint) {
401             payload().touch(hint);
402             return this;
403         }
404 
405         @Override
406         public FullHttpRequest setMethod(HttpMethod method) {
407             ((HttpRequest) message).setMethod(method);
408             return this;
409         }
410 
411         @Override
412         public FullHttpRequest setUri(String uri) {
413             ((HttpRequest) message).setUri(uri);
414             return this;
415         }
416 
417         @Override
418         public HttpMethod method() {
419             return ((HttpRequest) message).method();
420         }
421 
422         @Override
423         public String uri() {
424             return ((HttpRequest) message).uri();
425         }
426 
427         @Override
428         public FullHttpRequest setProtocolVersion(HttpVersion version) {
429             super.setProtocolVersion(version);
430             return this;
431         }
432 
433         @Override
434         public String toString() {
435             return HttpMessageUtil.appendFullRequest(new StringBuilder(256), this).toString();
436         }
437     }
438 
439     private static final class AggregatedFullHttpResponse extends AggregatedFullHttpMessage<FullHttpResponse>
440             implements FullHttpResponse {
441 
442         AggregatedFullHttpResponse(HttpResponse message, Buffer content, HttpHeaders trailingHeaders) {
443             super(message, content, trailingHeaders);
444         }
445 
446         @Override
447         public Send<FullHttpResponse> send() {
448             return payload().send().map(FullHttpResponse.class,
449                     p -> new AggregatedFullHttpResponse(this, p, trailingHeaders()));
450         }
451 
452         @Override
453         public AggregatedFullHttpResponse copy() {
454             return new AggregatedFullHttpResponse(this, payload().copy(), trailingHeaders().copy());
455         }
456 
457         @Override
458         public FullHttpResponse touch(Object hint) {
459             payload().touch(hint);
460             return this;
461         }
462 
463         @Override
464         public FullHttpResponse setStatus(HttpResponseStatus status) {
465             ((HttpResponse) message).setStatus(status);
466             return this;
467         }
468 
469         @Override
470         public HttpResponseStatus status() {
471             return ((HttpResponse) message).status();
472         }
473 
474         @Override
475         public FullHttpResponse setProtocolVersion(HttpVersion version) {
476             super.setProtocolVersion(version);
477             return this;
478         }
479 
480         @Override
481         public String toString() {
482             return HttpMessageUtil.appendFullResponse(new StringBuilder(256), this).toString();
483         }
484     }
485 }