1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufHolder;
20 import io.netty.buffer.CompositeByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandler;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.handler.codec.DecoderResult;
28 import io.netty.handler.codec.MessageToMessageDecoder;
29 import io.netty.handler.codec.TooLongFrameException;
30 import io.netty.handler.codec.http.HttpHeaders.Names;
31
32 import java.util.List;
33
34 import static io.netty.handler.codec.http.HttpHeaders.is100ContinueExpected;
35 import static io.netty.handler.codec.http.HttpHeaders.isContentLengthSet;
36 import static io.netty.handler.codec.http.HttpHeaders.removeTransferEncodingChunked;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
90 public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
91 private static final FullHttpResponse CONTINUE =
92 new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE, Unpooled.EMPTY_BUFFER);
93 private static final FullHttpResponse EXPECTATION_FAILED = new DefaultFullHttpResponse(
94 HttpVersion.HTTP_1_1, HttpResponseStatus.EXPECTATION_FAILED, Unpooled.EMPTY_BUFFER);
95
96 static {
97 HttpHeaders.setContentLength(EXPECTATION_FAILED, 0);
98 }
99
100 private final int maxContentLength;
101 private AggregatedFullHttpMessage currentMessage;
102 private final boolean closeOnExpectationFailed;
103
104 private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
105 private ChannelHandlerContext ctx;
106
107
108
109
110
111
112
113
114
115 public HttpObjectAggregator(int maxContentLength) {
116 this(maxContentLength, false);
117 }
118
119
120
121
122
123
124
125
126
127
128
129 public HttpObjectAggregator(int maxContentLength, boolean closeOnExpectationFailed) {
130 if (maxContentLength <= 0) {
131 throw new IllegalArgumentException("maxContentLength must be a positive integer: " + maxContentLength);
132 }
133 this.maxContentLength = maxContentLength;
134 this.closeOnExpectationFailed = closeOnExpectationFailed;
135 }
136
137
138
139
140
141
142 public final int getMaxCumulationBufferComponents() {
143 return maxCumulationBufferComponents;
144 }
145
146
147
148
149
150
151
152
153 public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
154 if (maxCumulationBufferComponents < 2) {
155 throw new IllegalArgumentException(
156 "maxCumulationBufferComponents: " + maxCumulationBufferComponents +
157 " (expected: >= 2)");
158 }
159
160 if (ctx == null) {
161 this.maxCumulationBufferComponents = maxCumulationBufferComponents;
162 } else {
163 throw new IllegalStateException(
164 "decoder properties cannot be changed once the decoder is added to a pipeline.");
165 }
166 }
167
168 @Override
169 protected void decode(final ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
170 if (msg instanceof HttpMessage) {
171 if (currentMessage != null) {
172 currentMessage.release();
173 currentMessage = null;
174 throw new IllegalStateException("Start of new message received before existing message completed.");
175 }
176 HttpMessage m = (HttpMessage) msg;
177
178
179 if (is100ContinueExpected(m)) {
180 if (HttpHeaders.getContentLength(m, 0) > maxContentLength) {
181 final ChannelFuture future = ctx.writeAndFlush(EXPECTATION_FAILED.duplicate().retain());
182 future.addListener(new ChannelFutureListener() {
183 @Override
184 public void operationComplete(ChannelFuture future) throws Exception {
185 if (!future.isSuccess()) {
186 ctx.fireExceptionCaught(future.cause());
187 }
188 }
189 });
190 if (closeOnExpectationFailed) {
191 future.addListener(ChannelFutureListener.CLOSE);
192 }
193 ctx.pipeline().fireUserEventTriggered(HttpExpectationFailedEvent.INSTANCE);
194 return;
195 }
196 ctx.writeAndFlush(CONTINUE.duplicate().retain()).addListener(new ChannelFutureListener() {
197 @Override
198 public void operationComplete(ChannelFuture future) throws Exception {
199 if (!future.isSuccess()) {
200 ctx.fireExceptionCaught(future.cause());
201 }
202 }
203 });
204 }
205
206 if (!m.getDecoderResult().isSuccess()) {
207 removeTransferEncodingChunked(m);
208 out.add(toFullMessage(m));
209 return;
210 }
211 if (msg instanceof HttpRequest) {
212 HttpRequest header = (HttpRequest) msg;
213 currentMessage = new AggregatedFullHttpRequest(
214 header, ctx.alloc().compositeBuffer(maxCumulationBufferComponents), null);
215 } else if (msg instanceof HttpResponse) {
216 HttpResponse header = (HttpResponse) msg;
217 currentMessage = new AggregatedFullHttpResponse(
218 header, ctx.alloc().compositeBuffer(maxCumulationBufferComponents), null);
219 } else {
220 throw new Error();
221 }
222
223
224 removeTransferEncodingChunked(currentMessage);
225 } else if (msg instanceof HttpContent) {
226 if (currentMessage == null) {
227
228
229 return;
230 }
231
232
233 HttpContent chunk = (HttpContent) msg;
234 CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
235
236 if (content.readableBytes() > maxContentLength - chunk.content().readableBytes()) {
237
238 currentMessage.release();
239 currentMessage = null;
240
241 throw new TooLongFrameException(
242 "HTTP content length exceeded " + maxContentLength +
243 " bytes.");
244 }
245
246
247 if (chunk.content().isReadable()) {
248 content.addComponent(true, chunk.content().retain());
249 }
250
251 final boolean last;
252 if (!chunk.getDecoderResult().isSuccess()) {
253 currentMessage.setDecoderResult(
254 DecoderResult.failure(chunk.getDecoderResult().cause()));
255 last = true;
256 } else {
257 last = chunk instanceof LastHttpContent;
258 }
259
260 if (last) {
261
262 if (chunk instanceof LastHttpContent) {
263 LastHttpContent trailer = (LastHttpContent) chunk;
264 currentMessage.setTrailingHeaders(trailer.trailingHeaders());
265 } else {
266 currentMessage.setTrailingHeaders(new DefaultHttpHeaders());
267 }
268
269
270
271
272
273
274
275 if (!isContentLengthSet(currentMessage)) {
276 currentMessage.headers().set(
277 Names.CONTENT_LENGTH,
278 String.valueOf(content.readableBytes()));
279 }
280
281 AggregatedFullHttpMessage currentMessage = this.currentMessage;
282 this.currentMessage = null;
283 out.add(currentMessage);
284 }
285 } else {
286 throw new Error();
287 }
288 }
289
290 @Override
291 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
292 try {
293 super.channelInactive(ctx);
294 } finally {
295
296 releaseCurrentMessage();
297 }
298 }
299
300 @Override
301 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
302 this.ctx = ctx;
303 }
304
305 @Override
306 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
307 try {
308 super.handlerRemoved(ctx);
309 } finally {
310
311
312 releaseCurrentMessage();
313 }
314 }
315
316 private void releaseCurrentMessage() {
317 if (currentMessage != null) {
318 currentMessage.release();
319 currentMessage = null;
320 }
321 }
322
323 private static FullHttpMessage toFullMessage(HttpMessage msg) {
324 if (msg instanceof FullHttpMessage) {
325 return ((FullHttpMessage) msg).retain();
326 }
327
328 FullHttpMessage fullMsg;
329 if (msg instanceof HttpRequest) {
330 fullMsg = new AggregatedFullHttpRequest(
331 (HttpRequest) msg, Unpooled.EMPTY_BUFFER, new DefaultHttpHeaders());
332 } else if (msg instanceof HttpResponse) {
333 fullMsg = new AggregatedFullHttpResponse(
334 (HttpResponse) msg, Unpooled.EMPTY_BUFFER, new DefaultHttpHeaders());
335 } else {
336 throw new IllegalStateException();
337 }
338
339 return fullMsg;
340 }
341
342 private abstract static class AggregatedFullHttpMessage implements ByteBufHolder, FullHttpMessage {
343 protected final HttpMessage message;
344 private final ByteBuf content;
345 private HttpHeaders trailingHeaders;
346
347 AggregatedFullHttpMessage(HttpMessage message, ByteBuf content, HttpHeaders trailingHeaders) {
348 this.message = message;
349 this.content = content;
350 this.trailingHeaders = trailingHeaders;
351 }
352
353 @Override
354 public HttpHeaders trailingHeaders() {
355 HttpHeaders trailingHeaders = this.trailingHeaders;
356 if (trailingHeaders == null) {
357 return HttpHeaders.EMPTY_HEADERS;
358 } else {
359 return trailingHeaders;
360 }
361 }
362
363 void setTrailingHeaders(HttpHeaders trailingHeaders) {
364 this.trailingHeaders = trailingHeaders;
365 }
366
367 @Override
368 public HttpVersion getProtocolVersion() {
369 return message.getProtocolVersion();
370 }
371
372 @Override
373 public FullHttpMessage setProtocolVersion(HttpVersion version) {
374 message.setProtocolVersion(version);
375 return this;
376 }
377
378 @Override
379 public HttpHeaders headers() {
380 return message.headers();
381 }
382
383 @Override
384 public DecoderResult getDecoderResult() {
385 return message.getDecoderResult();
386 }
387
388 @Override
389 public void setDecoderResult(DecoderResult result) {
390 message.setDecoderResult(result);
391 }
392
393 @Override
394 public ByteBuf content() {
395 return content;
396 }
397
398 @Override
399 public int refCnt() {
400 return content.refCnt();
401 }
402
403 @Override
404 public FullHttpMessage retain() {
405 content.retain();
406 return this;
407 }
408
409 @Override
410 public FullHttpMessage retain(int increment) {
411 content.retain(increment);
412 return this;
413 }
414
415 @Override
416 public boolean release() {
417 return content.release();
418 }
419
420 @Override
421 public boolean release(int decrement) {
422 return content.release(decrement);
423 }
424
425 @Override
426 public abstract FullHttpMessage copy();
427
428 @Override
429 public abstract FullHttpMessage duplicate();
430 }
431
432 private static final class AggregatedFullHttpRequest extends AggregatedFullHttpMessage implements FullHttpRequest {
433
434 AggregatedFullHttpRequest(HttpRequest request, ByteBuf content, HttpHeaders trailingHeaders) {
435 super(request, content, trailingHeaders);
436 }
437
438 @Override
439 public FullHttpRequest copy() {
440 DefaultFullHttpRequest copy = new DefaultFullHttpRequest(
441 getProtocolVersion(), getMethod(), getUri(), content().copy());
442 copy.headers().set(headers());
443 copy.trailingHeaders().set(trailingHeaders());
444 return copy;
445 }
446
447 @Override
448 public FullHttpRequest duplicate() {
449 DefaultFullHttpRequest duplicate = new DefaultFullHttpRequest(
450 getProtocolVersion(), getMethod(), getUri(), content().duplicate());
451 duplicate.headers().set(headers());
452 duplicate.trailingHeaders().set(trailingHeaders());
453 return duplicate;
454 }
455
456 @Override
457 public FullHttpRequest retain(int increment) {
458 super.retain(increment);
459 return this;
460 }
461
462 @Override
463 public FullHttpRequest retain() {
464 super.retain();
465 return this;
466 }
467
468 @Override
469 public FullHttpRequest setMethod(HttpMethod method) {
470 ((HttpRequest) message).setMethod(method);
471 return this;
472 }
473
474 @Override
475 public FullHttpRequest setUri(String uri) {
476 ((HttpRequest) message).setUri(uri);
477 return this;
478 }
479
480 @Override
481 public HttpMethod getMethod() {
482 return ((HttpRequest) message).getMethod();
483 }
484
485 @Override
486 public String getUri() {
487 return ((HttpRequest) message).getUri();
488 }
489
490 @Override
491 public FullHttpRequest setProtocolVersion(HttpVersion version) {
492 super.setProtocolVersion(version);
493 return this;
494 }
495
496 @Override
497 public String toString() {
498 return HttpMessageUtil.appendFullRequest(new StringBuilder(256), this).toString();
499 }
500 }
501
502 private static final class AggregatedFullHttpResponse extends AggregatedFullHttpMessage
503 implements FullHttpResponse {
504
505 AggregatedFullHttpResponse(HttpResponse message, ByteBuf content, HttpHeaders trailingHeaders) {
506 super(message, content, trailingHeaders);
507 }
508
509 @Override
510 public FullHttpResponse copy() {
511 DefaultFullHttpResponse copy = new DefaultFullHttpResponse(
512 getProtocolVersion(), getStatus(), content().copy());
513 copy.headers().set(headers());
514 copy.trailingHeaders().set(trailingHeaders());
515 return copy;
516 }
517
518 @Override
519 public FullHttpResponse duplicate() {
520 DefaultFullHttpResponse duplicate = new DefaultFullHttpResponse(getProtocolVersion(), getStatus(),
521 content().duplicate());
522 duplicate.headers().set(headers());
523 duplicate.trailingHeaders().set(trailingHeaders());
524 return duplicate;
525 }
526
527 @Override
528 public FullHttpResponse setStatus(HttpResponseStatus status) {
529 ((HttpResponse) message).setStatus(status);
530 return this;
531 }
532
533 @Override
534 public HttpResponseStatus getStatus() {
535 return ((HttpResponse) message).getStatus();
536 }
537
538 @Override
539 public FullHttpResponse setProtocolVersion(HttpVersion version) {
540 super.setProtocolVersion(version);
541 return this;
542 }
543
544 @Override
545 public FullHttpResponse retain(int increment) {
546 super.retain(increment);
547 return this;
548 }
549
550 @Override
551 public FullHttpResponse retain() {
552 super.retain();
553 return this;
554 }
555
556 @Override
557 public String toString() {
558 return HttpMessageUtil.appendFullResponse(new StringBuilder(256), this).toString();
559 }
560 }
561 }