1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package org.jboss.netty.handler.codec.http;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  import static org.jboss.netty.handler.codec.http.HttpHeaders.*;
20  
21  import java.util.List;
22  import java.util.Map.Entry;
23  
24  import org.jboss.netty.buffer.ChannelBuffer;
25  import org.jboss.netty.buffer.ChannelBuffers;
26  import org.jboss.netty.buffer.CompositeChannelBuffer;
27  import org.jboss.netty.channel.ChannelHandler;
28  import org.jboss.netty.channel.ChannelHandlerContext;
29  import org.jboss.netty.channel.ChannelPipeline;
30  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
31  import org.jboss.netty.channel.MessageEvent;
32  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
33  import org.jboss.netty.handler.codec.frame.TooLongFrameException;
34  import org.jboss.netty.util.CharsetUtil;
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  public class HttpChunkAggregator extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
55      public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
56  
57      private static final ChannelBuffer CONTINUE = ChannelBuffers.copiedBuffer(
58              "HTTP/1.1 100 Continue\r\n\r\n", CharsetUtil.US_ASCII);
59  
60      private final int maxContentLength;
61      private HttpMessage currentMessage;
62      private boolean tooLongFrameFound;
63      private ChannelHandlerContext ctx;
64  
65      private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
66  
67      
68  
69  
70  
71  
72  
73  
74  
75      public HttpChunkAggregator(int maxContentLength) {
76          if (maxContentLength <= 0) {
77              throw new IllegalArgumentException(
78                      "maxContentLength must be a positive integer: " +
79                      maxContentLength);
80          }
81          this.maxContentLength = maxContentLength;
82      }
83  
84      
85  
86  
87  
88  
89  
90      public final int getMaxCumulationBufferComponents() {
91          return maxCumulationBufferComponents;
92      }
93  
94      
95  
96  
97  
98  
99  
100 
101     public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
102         if (maxCumulationBufferComponents < 2) {
103             throw new IllegalArgumentException(
104                     "maxCumulationBufferComponents: " + maxCumulationBufferComponents +
105                     " (expected: >= 2)");
106         }
107 
108         if (ctx == null) {
109             this.maxCumulationBufferComponents = maxCumulationBufferComponents;
110         } else {
111             throw new IllegalStateException(
112                     "decoder properties cannot be changed once the decoder is added to a pipeline.");
113         }
114     }
115 
116     @Override
117     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
118             throws Exception {
119 
120         Object msg = e.getMessage();
121         HttpMessage currentMessage = this.currentMessage;
122 
123         if (msg instanceof HttpMessage) {
124             HttpMessage m = (HttpMessage) msg;
125             tooLongFrameFound = false;
126 
127             
128             
129             
130             
131             
132             if (is100ContinueExpected(m)) {
133                 write(ctx, succeededFuture(ctx.getChannel()), CONTINUE.duplicate());
134             }
135 
136             if (m.isChunked()) {
137                 
138                 
139                 HttpCodecUtil.removeTransferEncodingChunked(m);
140                 m.setChunked(false);
141                 this.currentMessage = m;
142             } else {
143                 
144                 this.currentMessage = null;
145                 ctx.sendUpstream(e);
146             }
147         } else if (msg instanceof HttpChunk) {
148             
149             if (currentMessage == null) {
150                 throw new IllegalStateException(
151                         "received " + HttpChunk.class.getSimpleName() +
152                         " without " + HttpMessage.class.getSimpleName());
153             }
154             HttpChunk chunk = (HttpChunk) msg;
155 
156             if (tooLongFrameFound) {
157                 if (chunk.isLast()) {
158                     this.currentMessage = null;
159                 }
160                 return;
161             }
162 
163             
164             ChannelBuffer content = currentMessage.getContent();
165 
166             if (content.readableBytes() > maxContentLength - chunk.getContent().readableBytes()) {
167                 tooLongFrameFound = true;
168 
169                 throw new TooLongFrameException(
170                         "HTTP content length exceeded " + maxContentLength +
171                         " bytes.");
172             }
173 
174             
175             appendToCumulation(chunk.getContent());
176 
177             if (chunk.isLast()) {
178                 this.currentMessage = null;
179 
180                 
181                 if (chunk instanceof HttpChunkTrailer) {
182                     HttpChunkTrailer trailer = (HttpChunkTrailer) chunk;
183                     for (Entry<String, String> header: trailer.trailingHeaders()) {
184                         currentMessage.headers().set(header.getKey(), header.getValue());
185                     }
186                 }
187 
188                 
189                 currentMessage.headers().set(
190                         HttpHeaders.Names.CONTENT_LENGTH,
191                         String.valueOf(content.readableBytes()));
192 
193                 
194                 fireMessageReceived(ctx, currentMessage, e.getRemoteAddress());
195             }
196         } else {
197             
198             ctx.sendUpstream(e);
199         }
200     }
201 
202     protected void appendToCumulation(ChannelBuffer input) {
203         ChannelBuffer cumulation = currentMessage.getContent();
204         if (cumulation instanceof CompositeChannelBuffer) {
205             
206             CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation;
207             if (composite.numComponents() >= maxCumulationBufferComponents) {
208                 currentMessage.setContent(ChannelBuffers.wrappedBuffer(composite.copy(), input));
209             } else {
210                 List<ChannelBuffer> decomposed = composite.decompose(0, composite.readableBytes());
211                 ChannelBuffer[] buffers = decomposed.toArray(new ChannelBuffer[decomposed.size() + 1]);
212                 buffers[buffers.length - 1] = input;
213 
214                 currentMessage.setContent(ChannelBuffers.wrappedBuffer(buffers));
215             }
216         } else {
217             currentMessage.setContent(ChannelBuffers.wrappedBuffer(cumulation, input));
218         }
219     }
220 
221     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
222         this.ctx = ctx;
223     }
224 
225     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
226         
227     }
228 
229     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
230         
231     }
232 
233     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
234         
235     }
236 }