View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.buffer.CompositeByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandler;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.handler.codec.DecoderResult;
28  import io.netty.handler.codec.MessageToMessageDecoder;
29  import io.netty.handler.codec.TooLongFrameException;
30  import io.netty.handler.codec.http.HttpHeaders.Names;
31  
32  import java.util.List;
33  
34  import static io.netty.handler.codec.http.HttpHeaders.is100ContinueExpected;
35  import static io.netty.handler.codec.http.HttpHeaders.isContentLengthSet;
36  import static io.netty.handler.codec.http.HttpHeaders.removeTransferEncodingChunked;
37  
38  /**
39   * A {@link ChannelHandler} that aggregates an {@link HttpMessage}
40   * and its following {@link HttpContent}s into a single {@link FullHttpRequest}
41   * or {@link FullHttpResponse} (depending on if it used to handle requests or responses)
42   * with no following {@link HttpContent}s.  It is useful when you don't want to take
43   * care of HTTP messages whose transfer encoding is 'chunked'.  Insert this
44   * handler after {@link HttpResponseDecoder} in the {@link ChannelPipeline} if being used to handle
45   * responses, or after {@link HttpRequestDecoder} and {@link HttpResponseEncoder} in the
46   * {@link ChannelPipeline} if being used to handle requests.
47   * <blockquote>
48   *  <pre>
49   *  {@link ChannelPipeline} p = ...;
50   *  ...
51   *  p.addLast("decoder", <b>new {@link HttpRequestDecoder}()</b>);
52   *  p.addLast("encoder", <b>new {@link HttpResponseEncoder}()</b>);
53   *  p.addLast("aggregator", <b>new {@link HttpObjectAggregator}(1048576)</b>);
54   *  ...
55   *  p.addLast("handler", new HttpRequestHandler());
56   *  </pre>
57   * </blockquote>
58   * <p>
59   * For convenience, consider putting a {@link HttpServerCodec} before the {@link HttpObjectAggregator}
60   * as it functions as both a {@link HttpRequestDecoder} and a {@link HttpResponseEncoder}.
61   * </p>
62   * Be aware that {@link HttpObjectAggregator} may end up sending a {@link HttpResponse}:
63   * <table border summary="Possible Responses">
64   *   <tbody>
65   *     <tr>
66   *       <th>Response Status</th>
67   *       <th>Condition When Sent</th>
68   *     </tr>
69   *     <tr>
70   *       <td>100 Continue</td>
71   *       <td>A '100-continue' expectation is received and the 'content-length' doesn't exceed maxContentLength</td>
72   *     </tr>
73   *     <tr>
74   *       <td>417 Expectation Failed</td>
75   *       <td>A '100-continue' expectation is received and the 'content-length' exceeds maxContentLength</td>
76   *     </tr>
77   *     <tr>
78   *       <td>413 Request Entity Too Large</td>
79   *       <td>Either the 'content-length' or the bytes received so far exceed maxContentLength</td>
80   *     </tr>
81   *   </tbody>
82   * </table>
83   *
84   * @see FullHttpRequest
85   * @see FullHttpResponse
86   * @see HttpResponseDecoder
87   * @see HttpServerCodec
88   */
89  public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
90      public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
91      private static final FullHttpResponse CONTINUE =
92              new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE, Unpooled.EMPTY_BUFFER);
93      private static final FullHttpResponse EXPECTATION_FAILED = new DefaultFullHttpResponse(
94              HttpVersion.HTTP_1_1, HttpResponseStatus.EXPECTATION_FAILED, Unpooled.EMPTY_BUFFER);
95  
96      static {
97          HttpHeaders.setContentLength(EXPECTATION_FAILED, 0);
98      }
99  
100     private final int maxContentLength;
101     private AggregatedFullHttpMessage currentMessage;
102     private final boolean closeOnExpectationFailed;
103 
104     private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
105     private ChannelHandlerContext ctx;
106 
107     /**
108      * Creates a new instance.
109      *
110      * @param maxContentLength
111      *        the maximum length of the aggregated content in bytes.
112      *        If the length of the aggregated content exceeds this value,
113      *        a {@link TooLongFrameException} will be raised.
114      */
115     public HttpObjectAggregator(int maxContentLength) {
116         this(maxContentLength, false);
117     }
118 
119     /**
120      * Creates a new instance.
121      * @param maxContentLength
122      *        the maximum length of the aggregated content in bytes.
123      *        If the length of the aggregated content exceeds this value,
124      *        a {@link TooLongFrameException} will be raised.
125      * @param closeOnExpectationFailed If a 100-continue response is detected but the content length is too large
126      * then {@code true} means close the connection. otherwise the connection will remain open and data will be
127      * consumed and discarded until the next request is received.
128      */
129     public HttpObjectAggregator(int maxContentLength, boolean closeOnExpectationFailed) {
130         if (maxContentLength <= 0) {
131             throw new IllegalArgumentException("maxContentLength must be a positive integer: " + maxContentLength);
132         }
133         this.maxContentLength = maxContentLength;
134         this.closeOnExpectationFailed = closeOnExpectationFailed;
135     }
136     /**
137      * Returns the maximum number of components in the cumulation buffer.  If the number of
138      * the components in the cumulation buffer exceeds this value, the components of the
139      * cumulation buffer are consolidated into a single component, involving memory copies.
140      * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}.
141      */
142     public final int getMaxCumulationBufferComponents() {
143         return maxCumulationBufferComponents;
144     }
145 
146     /**
147      * Sets the maximum number of components in the cumulation buffer.  If the number of
148      * the components in the cumulation buffer exceeds this value, the components of the
149      * cumulation buffer are consolidated into a single component, involving memory copies.
150      * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
151      * and its minimum allowed value is {@code 2}.
152      */
153     public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
154         if (maxCumulationBufferComponents < 2) {
155             throw new IllegalArgumentException(
156                     "maxCumulationBufferComponents: " + maxCumulationBufferComponents +
157                     " (expected: >= 2)");
158         }
159 
160         if (ctx == null) {
161             this.maxCumulationBufferComponents = maxCumulationBufferComponents;
162         } else {
163             throw new IllegalStateException(
164                     "decoder properties cannot be changed once the decoder is added to a pipeline.");
165         }
166     }
167 
168     @Override
169     protected void decode(final ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
170         if (msg instanceof HttpMessage) {
171             if (currentMessage != null) {
172                 currentMessage.release();
173                 currentMessage = null;
174                 throw new IllegalStateException("Start of new message received before existing message completed.");
175             }
176             HttpMessage m = (HttpMessage) msg;
177 
178             // Handle the 'Expect: 100-continue' header if necessary.
179             if (is100ContinueExpected(m)) {
180                 if (HttpHeaders.getContentLength(m, 0) > maxContentLength) {
181                     final ChannelFuture future = ctx.writeAndFlush(EXPECTATION_FAILED.duplicate().retain());
182                     future.addListener(new ChannelFutureListener() {
183                         @Override
184                         public void operationComplete(ChannelFuture future) throws Exception {
185                             if (!future.isSuccess()) {
186                                 ctx.fireExceptionCaught(future.cause());
187                             }
188                         }
189                     });
190                     if (closeOnExpectationFailed) {
191                         future.addListener(ChannelFutureListener.CLOSE);
192                     }
193                     ctx.pipeline().fireUserEventTriggered(HttpExpectationFailedEvent.INSTANCE);
194                     return;
195                 }
196                 ctx.writeAndFlush(CONTINUE.duplicate().retain()).addListener(new ChannelFutureListener() {
197                     @Override
198                     public void operationComplete(ChannelFuture future) throws Exception {
199                         if (!future.isSuccess()) {
200                             ctx.fireExceptionCaught(future.cause());
201                         }
202                     }
203                 });
204             }
205 
206             if (!m.getDecoderResult().isSuccess()) {
207                 removeTransferEncodingChunked(m);
208                 out.add(toFullMessage(m));
209                 return;
210             }
211             if (msg instanceof HttpRequest) {
212                 HttpRequest header = (HttpRequest) msg;
213                 currentMessage = new AggregatedFullHttpRequest(
214                         header, ctx.alloc().compositeBuffer(maxCumulationBufferComponents), null);
215             } else if (msg instanceof HttpResponse) {
216                 HttpResponse header = (HttpResponse) msg;
217                 currentMessage = new AggregatedFullHttpResponse(
218                         header, ctx.alloc().compositeBuffer(maxCumulationBufferComponents), null);
219             } else {
220                 throw new Error();
221             }
222 
223             // A streamed message - initialize the cumulative buffer, and wait for incoming chunks.
224             removeTransferEncodingChunked(currentMessage);
225         } else if (msg instanceof HttpContent) {
226             if (currentMessage == null) {
227                 // it is possible that a TooLongFrameException was already thrown but we can still discard data
228                 // until the begging of the next request/response.
229                 return;
230             }
231 
232             // Merge the received chunk into the content of the current message.
233             HttpContent chunk = (HttpContent) msg;
234             CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
235 
236             if (content.readableBytes() > maxContentLength - chunk.content().readableBytes()) {
237                 // release current message to prevent leaks
238                 currentMessage.release();
239                 currentMessage = null;
240 
241                 throw new TooLongFrameException(
242                         "HTTP content length exceeded " + maxContentLength +
243                         " bytes.");
244             }
245 
246             // Append the content of the chunk
247             if (chunk.content().isReadable()) {
248                 content.addComponent(true, chunk.content().retain());
249             }
250 
251             final boolean last;
252             if (!chunk.getDecoderResult().isSuccess()) {
253                 currentMessage.setDecoderResult(
254                         DecoderResult.failure(chunk.getDecoderResult().cause()));
255                 last = true;
256             } else {
257                 last = chunk instanceof LastHttpContent;
258             }
259 
260             if (last) {
261                 // Merge trailing headers into the message.
262                 if (chunk instanceof LastHttpContent) {
263                     LastHttpContent trailer = (LastHttpContent) chunk;
264                     currentMessage.setTrailingHeaders(trailer.trailingHeaders());
265                 } else {
266                     currentMessage.setTrailingHeaders(new DefaultHttpHeaders());
267                 }
268 
269                 // Set the 'Content-Length' header. If one isn't already set.
270                 // This is important as HEAD responses will use a 'Content-Length' header which
271                 // does not match the actual body, but the number of bytes that would be
272                 // transmitted if a GET would have been used.
273                 //
274                 // See rfc2616 14.13 Content-Length
275                 if (!isContentLengthSet(currentMessage)) {
276                     currentMessage.headers().set(
277                             Names.CONTENT_LENGTH,
278                             String.valueOf(content.readableBytes()));
279                 }
280                 // Set our currentMessage member variable to null in case adding to out will cause re-entry.
281                 AggregatedFullHttpMessage currentMessage = this.currentMessage;
282                 this.currentMessage = null;
283                 out.add(currentMessage);
284             }
285         } else {
286             throw new Error();
287         }
288     }
289 
290     @Override
291     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
292         try {
293             super.channelInactive(ctx);
294         } finally {
295             // release current message if it is not null as it may be a left-over
296             releaseCurrentMessage();
297         }
298     }
299 
300     @Override
301     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
302         this.ctx = ctx;
303     }
304 
305     @Override
306     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
307         try {
308             super.handlerRemoved(ctx);
309         } finally {
310             // release current message if it is not null as it may be a left-over as there is not much more we can do in
311             // this case
312             releaseCurrentMessage();
313         }
314     }
315 
316     private void releaseCurrentMessage() {
317         if (currentMessage != null) {
318             currentMessage.release();
319             currentMessage = null;
320         }
321     }
322 
323     private static FullHttpMessage toFullMessage(HttpMessage msg) {
324         if (msg instanceof FullHttpMessage) {
325             return ((FullHttpMessage) msg).retain();
326         }
327 
328         FullHttpMessage fullMsg;
329         if (msg instanceof HttpRequest) {
330             fullMsg = new AggregatedFullHttpRequest(
331                     (HttpRequest) msg, Unpooled.EMPTY_BUFFER, new DefaultHttpHeaders());
332         } else if (msg instanceof HttpResponse) {
333             fullMsg = new AggregatedFullHttpResponse(
334                     (HttpResponse) msg, Unpooled.EMPTY_BUFFER, new DefaultHttpHeaders());
335         } else {
336             throw new IllegalStateException();
337         }
338 
339         return fullMsg;
340     }
341 
342     private abstract static class AggregatedFullHttpMessage implements ByteBufHolder, FullHttpMessage {
343         protected final HttpMessage message;
344         private final ByteBuf content;
345         private HttpHeaders trailingHeaders;
346 
347         AggregatedFullHttpMessage(HttpMessage message, ByteBuf content, HttpHeaders trailingHeaders) {
348             this.message = message;
349             this.content = content;
350             this.trailingHeaders = trailingHeaders;
351         }
352 
353         @Override
354         public HttpHeaders trailingHeaders() {
355             HttpHeaders trailingHeaders = this.trailingHeaders;
356             if (trailingHeaders == null) {
357                 return HttpHeaders.EMPTY_HEADERS;
358             } else {
359                 return trailingHeaders;
360             }
361         }
362 
363         void setTrailingHeaders(HttpHeaders trailingHeaders) {
364             this.trailingHeaders = trailingHeaders;
365         }
366 
367         @Override
368         public HttpVersion getProtocolVersion() {
369             return message.getProtocolVersion();
370         }
371 
372         @Override
373         public FullHttpMessage setProtocolVersion(HttpVersion version) {
374             message.setProtocolVersion(version);
375             return this;
376         }
377 
378         @Override
379         public HttpHeaders headers() {
380             return message.headers();
381         }
382 
383         @Override
384         public DecoderResult getDecoderResult() {
385             return message.getDecoderResult();
386         }
387 
388         @Override
389         public void setDecoderResult(DecoderResult result) {
390             message.setDecoderResult(result);
391         }
392 
393         @Override
394         public ByteBuf content() {
395             return content;
396         }
397 
398         @Override
399         public int refCnt() {
400             return content.refCnt();
401         }
402 
403         @Override
404         public FullHttpMessage retain() {
405             content.retain();
406             return this;
407         }
408 
409         @Override
410         public FullHttpMessage retain(int increment) {
411             content.retain(increment);
412             return this;
413         }
414 
415         @Override
416         public boolean release() {
417             return content.release();
418         }
419 
420         @Override
421         public boolean release(int decrement) {
422             return content.release(decrement);
423         }
424 
425         @Override
426         public abstract FullHttpMessage copy();
427 
428         @Override
429         public abstract FullHttpMessage duplicate();
430     }
431 
432     private static final class AggregatedFullHttpRequest extends AggregatedFullHttpMessage implements FullHttpRequest {
433 
434         AggregatedFullHttpRequest(HttpRequest request, ByteBuf content, HttpHeaders trailingHeaders) {
435             super(request, content, trailingHeaders);
436         }
437 
438         @Override
439         public FullHttpRequest copy() {
440             DefaultFullHttpRequest copy = new DefaultFullHttpRequest(
441                     getProtocolVersion(), getMethod(), getUri(), content().copy());
442             copy.headers().set(headers());
443             copy.trailingHeaders().set(trailingHeaders());
444             return copy;
445         }
446 
447         @Override
448         public FullHttpRequest duplicate() {
449             DefaultFullHttpRequest duplicate = new DefaultFullHttpRequest(
450                     getProtocolVersion(), getMethod(), getUri(), content().duplicate());
451             duplicate.headers().set(headers());
452             duplicate.trailingHeaders().set(trailingHeaders());
453             return duplicate;
454         }
455 
456         @Override
457         public FullHttpRequest retain(int increment) {
458             super.retain(increment);
459             return this;
460         }
461 
462         @Override
463         public FullHttpRequest retain() {
464             super.retain();
465             return this;
466         }
467 
468         @Override
469         public FullHttpRequest setMethod(HttpMethod method) {
470             ((HttpRequest) message).setMethod(method);
471             return this;
472         }
473 
474         @Override
475         public FullHttpRequest setUri(String uri) {
476             ((HttpRequest) message).setUri(uri);
477             return this;
478         }
479 
480         @Override
481         public HttpMethod getMethod() {
482             return ((HttpRequest) message).getMethod();
483         }
484 
485         @Override
486         public String getUri() {
487             return ((HttpRequest) message).getUri();
488         }
489 
490         @Override
491         public FullHttpRequest setProtocolVersion(HttpVersion version) {
492             super.setProtocolVersion(version);
493             return this;
494         }
495 
496         @Override
497         public String toString() {
498             return HttpMessageUtil.appendFullRequest(new StringBuilder(256), this).toString();
499         }
500     }
501 
502     private static final class AggregatedFullHttpResponse extends AggregatedFullHttpMessage
503             implements FullHttpResponse {
504 
505         AggregatedFullHttpResponse(HttpResponse message, ByteBuf content, HttpHeaders trailingHeaders) {
506             super(message, content, trailingHeaders);
507         }
508 
509         @Override
510         public FullHttpResponse copy() {
511             DefaultFullHttpResponse copy = new DefaultFullHttpResponse(
512                     getProtocolVersion(), getStatus(), content().copy());
513             copy.headers().set(headers());
514             copy.trailingHeaders().set(trailingHeaders());
515             return copy;
516         }
517 
518         @Override
519         public FullHttpResponse duplicate() {
520             DefaultFullHttpResponse duplicate = new DefaultFullHttpResponse(getProtocolVersion(), getStatus(),
521                     content().duplicate());
522             duplicate.headers().set(headers());
523             duplicate.trailingHeaders().set(trailingHeaders());
524             return duplicate;
525         }
526 
527         @Override
528         public FullHttpResponse setStatus(HttpResponseStatus status) {
529             ((HttpResponse) message).setStatus(status);
530             return this;
531         }
532 
533         @Override
534         public HttpResponseStatus getStatus() {
535             return ((HttpResponse) message).getStatus();
536         }
537 
538         @Override
539         public FullHttpResponse setProtocolVersion(HttpVersion version) {
540             super.setProtocolVersion(version);
541             return this;
542         }
543 
544         @Override
545         public FullHttpResponse retain(int increment) {
546             super.retain(increment);
547             return this;
548         }
549 
550         @Override
551         public FullHttpResponse retain() {
552             super.retain();
553             return this;
554         }
555 
556         @Override
557         public String toString() {
558             return HttpMessageUtil.appendFullResponse(new StringBuilder(256), this).toString();
559         }
560     }
561 }