1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.http;
17
18 import static org.jboss.netty.channel.Channels.*;
19 import static org.jboss.netty.handler.codec.http.HttpHeaders.*;
20
21 import java.util.List;
22 import java.util.Map.Entry;
23
24 import org.jboss.netty.buffer.ChannelBuffer;
25 import org.jboss.netty.buffer.ChannelBuffers;
26 import org.jboss.netty.buffer.CompositeChannelBuffer;
27 import org.jboss.netty.channel.ChannelHandler;
28 import org.jboss.netty.channel.ChannelHandlerContext;
29 import org.jboss.netty.channel.ChannelPipeline;
30 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
31 import org.jboss.netty.channel.MessageEvent;
32 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
33 import org.jboss.netty.handler.codec.frame.TooLongFrameException;
34 import org.jboss.netty.util.CharsetUtil;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 public class HttpChunkAggregator extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
55 public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
56
57 private static final ChannelBuffer CONTINUE = ChannelBuffers.copiedBuffer(
58 "HTTP/1.1 100 Continue\r\n\r\n", CharsetUtil.US_ASCII);
59
60 private final int maxContentLength;
61 private HttpMessage currentMessage;
62
63 private ChannelHandlerContext ctx;
64
65 private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
66
67
68
69
70
71
72
73
74
75 public HttpChunkAggregator(int maxContentLength) {
76 if (maxContentLength <= 0) {
77 throw new IllegalArgumentException(
78 "maxContentLength must be a positive integer: " +
79 maxContentLength);
80 }
81 this.maxContentLength = maxContentLength;
82 }
83
84
85
86
87
88
89
90 public final int getMaxCumulationBufferComponents() {
91 return maxCumulationBufferComponents;
92 }
93
94
95
96
97
98
99
100
101 public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
102 if (maxCumulationBufferComponents < 2) {
103 throw new IllegalArgumentException(
104 "maxCumulationBufferComponents: " + maxCumulationBufferComponents +
105 " (expected: >= 2)");
106 }
107
108 if (ctx == null) {
109 this.maxCumulationBufferComponents = maxCumulationBufferComponents;
110 } else {
111 throw new IllegalStateException(
112 "decoder properties cannot be changed once the decoder is added to a pipeline.");
113 }
114 }
115
116 @Override
117 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
118 throws Exception {
119
120 Object msg = e.getMessage();
121 HttpMessage currentMessage = this.currentMessage;
122
123 if (msg instanceof HttpMessage) {
124 HttpMessage m = (HttpMessage) msg;
125
126
127
128
129
130
131 if (is100ContinueExpected(m)) {
132 write(ctx, succeededFuture(ctx.getChannel()), CONTINUE.duplicate());
133 }
134
135 if (m.isChunked()) {
136
137
138 List<String> encodings = m.getHeaders(HttpHeaders.Names.TRANSFER_ENCODING);
139 encodings.remove(HttpHeaders.Values.CHUNKED);
140 if (encodings.isEmpty()) {
141 m.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING);
142 }
143 m.setChunked(false);
144 this.currentMessage = m;
145 } else {
146
147 this.currentMessage = null;
148 ctx.sendUpstream(e);
149 }
150 } else if (msg instanceof HttpChunk) {
151
152 if (currentMessage == null) {
153 throw new IllegalStateException(
154 "received " + HttpChunk.class.getSimpleName() +
155 " without " + HttpMessage.class.getSimpleName());
156 }
157
158
159 HttpChunk chunk = (HttpChunk) msg;
160 ChannelBuffer content = currentMessage.getContent();
161
162 if (content.readableBytes() > maxContentLength - chunk.getContent().readableBytes()) {
163
164
165
166
167 throw new TooLongFrameException(
168 "HTTP content length exceeded " + maxContentLength +
169 " bytes.");
170 }
171
172
173 appendToCumulation(chunk.getContent());
174
175 if (chunk.isLast()) {
176 this.currentMessage = null;
177
178
179 if (chunk instanceof HttpChunkTrailer) {
180 HttpChunkTrailer trailer = (HttpChunkTrailer) chunk;
181 for (Entry<String, String> header: trailer.getHeaders()) {
182 currentMessage.setHeader(header.getKey(), header.getValue());
183 }
184 }
185
186
187 currentMessage.setHeader(
188 HttpHeaders.Names.CONTENT_LENGTH,
189 String.valueOf(content.readableBytes()));
190
191
192 fireMessageReceived(ctx, currentMessage, e.getRemoteAddress());
193 }
194 } else {
195
196 ctx.sendUpstream(e);
197 }
198 }
199
200 protected void appendToCumulation(ChannelBuffer input) {
201 ChannelBuffer cumulation = currentMessage.getContent();
202 if (cumulation instanceof CompositeChannelBuffer) {
203
204 CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation;
205 if (composite.numComponents() >= maxCumulationBufferComponents) {
206 currentMessage.setContent(ChannelBuffers.wrappedBuffer(composite.copy(), input));
207 } else {
208 List<ChannelBuffer> decomposed = composite.decompose(0, composite.readableBytes());
209 ChannelBuffer[] buffers = decomposed.toArray(new ChannelBuffer[decomposed.size() + 1]);
210 buffers[buffers.length - 1] = input;
211
212 currentMessage.setContent(ChannelBuffers.wrappedBuffer(buffers));
213 }
214 } else {
215 currentMessage.setContent(ChannelBuffers.wrappedBuffer(cumulation, input));
216 }
217
218 }
219
220 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
221 this.ctx = ctx;
222 }
223
224 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
225
226 }
227
228 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
229
230 }
231
232 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
233
234 }
235 }