View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.http;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  import static org.jboss.netty.handler.codec.http.HttpHeaders.*;
20  
21  import java.util.List;
22  import java.util.Map.Entry;
23  
24  import org.jboss.netty.buffer.ChannelBuffer;
25  import org.jboss.netty.buffer.ChannelBuffers;
26  import org.jboss.netty.buffer.CompositeChannelBuffer;
27  import org.jboss.netty.channel.ChannelHandler;
28  import org.jboss.netty.channel.ChannelHandlerContext;
29  import org.jboss.netty.channel.ChannelPipeline;
30  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
31  import org.jboss.netty.channel.MessageEvent;
32  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
33  import org.jboss.netty.handler.codec.frame.TooLongFrameException;
34  import org.jboss.netty.util.CharsetUtil;
35  
36  /**
37   * A {@link ChannelHandler} that aggregates an {@link HttpMessage}
38   * and its following {@link HttpChunk}s into a single {@link HttpMessage} with
39   * no following {@link HttpChunk}s.  It is useful when you don't want to take
40   * care of HTTP messages whose transfer encoding is 'chunked'.  Insert this
41   * handler after {@link HttpMessageDecoder} in the {@link ChannelPipeline}:
42   * <pre>
43   * {@link ChannelPipeline} p = ...;
44   * ...
45   * p.addLast("decoder", new {@link HttpRequestDecoder}());
46   * p.addLast("aggregator", <b>new {@link HttpChunkAggregator}(1048576)</b>);
47   * ...
48   * p.addLast("encoder", new {@link HttpResponseEncoder}());
49   * p.addLast("handler", new HttpRequestHandler());
50   * </pre>
51   * @apiviz.landmark
52   * @apiviz.has org.jboss.netty.handler.codec.http.HttpChunk oneway - - filters out
53   */
54  public class HttpChunkAggregator extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
55      public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
56  
57      private static final ChannelBuffer CONTINUE = ChannelBuffers.copiedBuffer(
58              "HTTP/1.1 100 Continue\r\n\r\n", CharsetUtil.US_ASCII);
59  
60      private final int maxContentLength;
61      private HttpMessage currentMessage;
62  
63      private ChannelHandlerContext ctx;
64  
65      private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
66  
67      /**
68       * Creates a new instance.
69       *
70       * @param maxContentLength
71       *        the maximum length of the aggregated content.
72       *        If the length of the aggregated content exceeds this value,
73       *        a {@link TooLongFrameException} will be raised.
74       */
75      public HttpChunkAggregator(int maxContentLength) {
76          if (maxContentLength <= 0) {
77              throw new IllegalArgumentException(
78                      "maxContentLength must be a positive integer: " +
79                      maxContentLength);
80          }
81          this.maxContentLength = maxContentLength;
82      }
83  
84      /**
85       * Returns the maximum number of components in the cumulation buffer.  If the number of
86       * the components in the cumulation buffer exceeds this value, the components of the
87       * cumulation buffer are consolidated into a single component, involving memory copies.
88       * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}.
89       */
90      public final int getMaxCumulationBufferComponents() {
91          return maxCumulationBufferComponents;
92      }
93  
94      /**
95       * Sets the maximum number of components in the cumulation buffer.  If the number of
96       * the components in the cumulation buffer exceeds this value, the components of the
97       * cumulation buffer are consolidated into a single component, involving memory copies.
98       * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
99       * and its minimum allowed value is {@code 2}.
100      */
101     public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
102         if (maxCumulationBufferComponents < 2) {
103             throw new IllegalArgumentException(
104                     "maxCumulationBufferComponents: " + maxCumulationBufferComponents +
105                     " (expected: >= 2)");
106         }
107 
108         if (ctx == null) {
109             this.maxCumulationBufferComponents = maxCumulationBufferComponents;
110         } else {
111             throw new IllegalStateException(
112                     "decoder properties cannot be changed once the decoder is added to a pipeline.");
113         }
114     }
115 
116     @Override
117     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
118             throws Exception {
119 
120         Object msg = e.getMessage();
121         HttpMessage currentMessage = this.currentMessage;
122 
123         if (msg instanceof HttpMessage) {
124             HttpMessage m = (HttpMessage) msg;
125 
126             // Handle the 'Expect: 100-continue' header if necessary.
127             // TODO: Respond with 413 Request Entity Too Large
128             //   and discard the traffic or close the connection.
129             //       No need to notify the upstream handlers - just log.
130             //       If decoding a response, just throw an exception.
131             if (is100ContinueExpected(m)) {
132                 write(ctx, succeededFuture(ctx.getChannel()), CONTINUE.duplicate());
133             }
134 
135             if (m.isChunked()) {
136                 // A chunked message - remove 'Transfer-Encoding' header,
137                 // initialize the cumulative buffer, and wait for incoming chunks.
138                 List<String> encodings = m.getHeaders(HttpHeaders.Names.TRANSFER_ENCODING);
139                 encodings.remove(HttpHeaders.Values.CHUNKED);
140                 if (encodings.isEmpty()) {
141                     m.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING);
142                 }
143                 m.setChunked(false);
144                 this.currentMessage = m;
145             } else {
146                 // Not a chunked message - pass through.
147                 this.currentMessage = null;
148                 ctx.sendUpstream(e);
149             }
150         } else if (msg instanceof HttpChunk) {
151             // Sanity check
152             if (currentMessage == null) {
153                 throw new IllegalStateException(
154                         "received " + HttpChunk.class.getSimpleName() +
155                         " without " + HttpMessage.class.getSimpleName());
156             }
157 
158             // Merge the received chunk into the content of the current message.
159             HttpChunk chunk = (HttpChunk) msg;
160             ChannelBuffer content = currentMessage.getContent();
161 
162             if (content.readableBytes() > maxContentLength - chunk.getContent().readableBytes()) {
163                 // TODO: Respond with 413 Request Entity Too Large
164                 //   and discard the traffic or close the connection.
165                 //       No need to notify the upstream handlers - just log.
166                 //       If decoding a response, just throw an exception.
167                 throw new TooLongFrameException(
168                         "HTTP content length exceeded " + maxContentLength +
169                         " bytes.");
170             }
171 
172             // Append the content of the chunk
173             appendToCumulation(chunk.getContent());
174 
175             if (chunk.isLast()) {
176                 this.currentMessage = null;
177 
178                 // Merge trailing headers into the message.
179                 if (chunk instanceof HttpChunkTrailer) {
180                     HttpChunkTrailer trailer = (HttpChunkTrailer) chunk;
181                     for (Entry<String, String> header: trailer.getHeaders()) {
182                         currentMessage.setHeader(header.getKey(), header.getValue());
183                     }
184                 }
185 
186                 // Set the 'Content-Length' header.
187                 currentMessage.setHeader(
188                         HttpHeaders.Names.CONTENT_LENGTH,
189                         String.valueOf(content.readableBytes()));
190 
191                 // All done - generate the event.
192                 fireMessageReceived(ctx, currentMessage, e.getRemoteAddress());
193             }
194         } else {
195             // Neither HttpMessage or HttpChunk
196             ctx.sendUpstream(e);
197         }
198     }
199 
200     protected void appendToCumulation(ChannelBuffer input) {
201         ChannelBuffer cumulation = currentMessage.getContent();
202         if (cumulation instanceof CompositeChannelBuffer) {
203             // Make sure the resulting cumulation buffer has no more than the configured components.
204             CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation;
205             if (composite.numComponents() >= maxCumulationBufferComponents) {
206                 currentMessage.setContent(ChannelBuffers.wrappedBuffer(composite.copy(), input));
207             } else {
208                 List<ChannelBuffer> decomposed = composite.decompose(0, composite.readableBytes());
209                 ChannelBuffer[] buffers = decomposed.toArray(new ChannelBuffer[decomposed.size() + 1]);
210                 buffers[buffers.length - 1] = input;
211 
212                 currentMessage.setContent(ChannelBuffers.wrappedBuffer(buffers));
213             }
214         } else {
215             currentMessage.setContent(ChannelBuffers.wrappedBuffer(cumulation, input));
216         }
217 
218     }
219 
220     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
221         this.ctx = ctx;
222     }
223 
224     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
225         // noop
226     }
227 
228     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
229         // noop
230     }
231 
232     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
233         // noop
234     }
235 }