1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.http;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20 import io.netty5.buffer.api.CompositeBuffer;
21 import io.netty5.util.Send;
22 import io.netty5.channel.ChannelHandler;
23 import io.netty5.channel.ChannelHandlerContext;
24 import io.netty5.channel.ChannelPipeline;
25 import io.netty5.handler.codec.DecoderResult;
26 import io.netty5.handler.codec.MessageAggregator;
27 import io.netty5.util.concurrent.Future;
28 import io.netty5.util.internal.logging.InternalLogger;
29 import io.netty5.util.internal.logging.InternalLoggerFactory;
30
31 import static io.netty5.handler.codec.http.HttpHeaderNames.CONNECTION;
32 import static io.netty5.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
33 import static io.netty5.handler.codec.http.HttpHeaderNames.EXPECT;
34 import static io.netty5.handler.codec.http.HttpResponseStatus.CONTINUE;
35 import static io.netty5.handler.codec.http.HttpResponseStatus.EXPECTATION_FAILED;
36 import static io.netty5.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
37 import static io.netty5.handler.codec.http.HttpUtil.getContentLength;
38 import static io.netty5.handler.codec.http.HttpVersion.HTTP_1_1;
39 import static java.util.Objects.requireNonNullElse;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 public class HttpObjectAggregator<C extends HttpContent<C>>
93 extends MessageAggregator<HttpObject, HttpMessage, HttpContent<C>, FullHttpMessage<?>> {
94 private static final InternalLogger logger = InternalLoggerFactory.getInstance(HttpObjectAggregator.class);
95
96 private final boolean closeOnExpectationFailed;
97
98
99
100
101
102
103
104
105 public HttpObjectAggregator(int maxContentLength) {
106 this(maxContentLength, false);
107 }
108
109
110
111
112
113
114
115
116
117
118
119 public HttpObjectAggregator(int maxContentLength, boolean closeOnExpectationFailed) {
120 super(maxContentLength);
121 this.closeOnExpectationFailed = closeOnExpectationFailed;
122 }
123
124 @Override
125 protected HttpMessage tryStartMessage(Object msg) {
126 return msg instanceof HttpMessage ? (HttpMessage) msg : null;
127 }
128
129 @SuppressWarnings("unchecked")
130 @Override
131 protected HttpContent<C> tryContentMessage(Object msg) {
132 return msg instanceof HttpContent ? (HttpContent<C>) msg : null;
133 }
134
135 @Override
136 protected boolean isAggregated(Object msg) throws Exception {
137 return msg instanceof FullHttpMessage;
138 }
139
140 @Override
141 protected int lengthForContent(HttpContent<C> msg) {
142 return msg.payload().readableBytes();
143 }
144
145 @Override
146 protected int lengthForAggregation(FullHttpMessage<?> msg) {
147 return msg.payload().readableBytes();
148 }
149
150 @Override
151 protected boolean isLastContentMessage(HttpContent<C> msg) throws Exception {
152 return msg instanceof LastHttpContent;
153 }
154
155 @Override
156 protected boolean isContentLengthInvalid(HttpMessage start, int maxContentLength) {
157 try {
158 return getContentLength(start, -1L) > maxContentLength;
159 } catch (final NumberFormatException e) {
160 return false;
161 }
162 }
163
164 private static FullHttpResponse continueResponse(HttpMessage start, int maxContentLength,
165 ChannelPipeline pipeline) {
166 if (HttpUtil.isUnsupportedExpectation(start)) {
167
168 pipeline.fireChannelInboundEvent(HttpExpectationFailedEvent.INSTANCE);
169 return newErrorResponse(EXPECTATION_FAILED, pipeline.channel().bufferAllocator(), true, false);
170 } else if (HttpUtil.is100ContinueExpected(start)) {
171
172 if (getContentLength(start, -1L) <= maxContentLength) {
173 return newErrorResponse(CONTINUE, pipeline.channel().bufferAllocator(), false, false);
174 }
175 pipeline.fireChannelInboundEvent(HttpExpectationFailedEvent.INSTANCE);
176 return newErrorResponse(REQUEST_ENTITY_TOO_LARGE, pipeline.channel().bufferAllocator(), true, false);
177 }
178
179 return null;
180 }
181
182 @Override
183 protected Object newContinueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {
184 FullHttpResponse response = continueResponse(start, maxContentLength, pipeline);
185
186
187 if (response != null) {
188 start.headers().remove(EXPECT);
189 }
190 return response;
191 }
192
193 @Override
194 protected boolean closeAfterContinueResponse(Object msg) {
195 return closeOnExpectationFailed && ignoreContentAfterContinueResponse(msg);
196 }
197
198 @Override
199 protected boolean ignoreContentAfterContinueResponse(Object msg) {
200 if (msg instanceof HttpResponse) {
201 final HttpResponse httpResponse = (HttpResponse) msg;
202 return httpResponse.status().codeClass().equals(HttpStatusClass.CLIENT_ERROR);
203 }
204 return false;
205 }
206
207 @Override
208 protected FullHttpMessage<?> beginAggregation(BufferAllocator allocator, HttpMessage start) throws Exception {
209 assert !(start instanceof FullHttpMessage);
210
211 HttpUtil.setTransferEncodingChunked(start, false);
212
213 final CompositeBuffer content = allocator.compose();
214 FullHttpMessage<?> ret;
215 if (start instanceof HttpRequest) {
216 ret = new AggregatedFullHttpRequest((HttpRequest) start, content, null);
217 } else if (start instanceof HttpResponse) {
218 ret = new AggregatedFullHttpResponse((HttpResponse) start, content, null);
219 } else {
220 throw new Error();
221 }
222 return ret;
223 }
224
225 @Override
226 protected void aggregate(BufferAllocator allocator, FullHttpMessage<?> aggregated,
227 HttpContent<C> content) throws Exception {
228 final CompositeBuffer payload = (CompositeBuffer) aggregated.payload();
229 payload.extendWith(content.payload().send());
230 if (content instanceof LastHttpContent) {
231
232 ((AggregatedFullHttpMessage<?>) aggregated).setTrailingHeaders(((LastHttpContent<?>) content)
233 .trailingHeaders());
234 }
235 }
236
237 @Override
238 protected void finishAggregation(BufferAllocator allocator, FullHttpMessage<?> aggregated) throws Exception {
239
240
241
242
243
244
245 if (!HttpUtil.isContentLengthSet(aggregated)) {
246 aggregated.headers()
247 .set(CONTENT_LENGTH, String.valueOf(aggregated.payload().readableBytes()));
248 }
249 }
250
251 @Override
252 protected void handleOversizedMessage(final ChannelHandlerContext ctx, Object oversized) throws Exception {
253 if (oversized instanceof HttpRequest) {
254 HttpRequest request = (HttpRequest) oversized;
255
256
257
258
259 if (oversized instanceof FullHttpMessage ||
260 !HttpUtil.is100ContinueExpected(request) && !HttpUtil.isKeepAlive(request)) {
261 Future<Void> future = ctx.writeAndFlush(newErrorResponse(REQUEST_ENTITY_TOO_LARGE,
262 ctx.bufferAllocator(), true, true));
263 future.addListener(f -> {
264 if (f.isFailed()) {
265 logger.debug("Failed to send a 413 Request Entity Too Large.", f.cause());
266 }
267 ctx.close();
268 });
269 } else {
270 ctx.writeAndFlush(newErrorResponse(REQUEST_ENTITY_TOO_LARGE,
271 ctx.bufferAllocator(), true, false))
272 .addListener(future -> {
273 if (future.isFailed()) {
274 logger.debug("Failed to send a 413 Request Entity Too Large.", future.cause());
275 ctx.close();
276 }
277 });
278 }
279 } else if (oversized instanceof HttpResponse) {
280 throw new ResponseTooLargeException("Response entity too large: " + oversized);
281 } else {
282 throw new IllegalStateException();
283 }
284 }
285
286 @Override
287 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
288 super.channelExceptionCaught(ctx, cause);
289 if (cause instanceof ResponseTooLargeException) {
290 ctx.close();
291 }
292 }
293
294 private static FullHttpResponse newErrorResponse(HttpResponseStatus status, BufferAllocator allocator,
295 boolean emptyContent, boolean closeConnection) {
296 FullHttpResponse resp = new DefaultFullHttpResponse(HTTP_1_1, status, allocator.allocate(0));
297 if (emptyContent) {
298 resp.headers().set(CONTENT_LENGTH, 0);
299 }
300 if (closeConnection) {
301 resp.headers().set(CONNECTION, HttpHeaderValues.CLOSE);
302 }
303 return resp;
304 }
305
306 private static final class ResponseTooLargeException extends TooLongHttpContentException {
307 ResponseTooLargeException(String message) {
308 super(message);
309 }
310 }
311
312 private abstract static class AggregatedFullHttpMessage<R extends FullHttpMessage<R>>
313 implements FullHttpMessage<R> {
314 protected final HttpMessage message;
315 private final Buffer payload;
316 private HttpHeaders trailingHeaders;
317
318 AggregatedFullHttpMessage(HttpMessage message, Buffer payload, HttpHeaders trailingHeaders) {
319 this.message = message;
320 this.payload = payload;
321 this.trailingHeaders = trailingHeaders;
322 }
323
324 @Override
325 public void close() {
326 payload.close();
327 }
328
329 @Override
330 public boolean isAccessible() {
331 return payload.isAccessible();
332 }
333
334 @Override
335 public Buffer payload() {
336 return payload;
337 }
338
339 @Override
340 public HttpHeaders trailingHeaders() {
341 HttpHeaders trailingHeaders = this.trailingHeaders;
342 return requireNonNullElse(trailingHeaders, EmptyHttpHeaders.INSTANCE);
343 }
344
345 void setTrailingHeaders(HttpHeaders trailingHeaders) {
346 this.trailingHeaders = trailingHeaders;
347 }
348
349 @Override
350 public HttpVersion getProtocolVersion() {
351 return message.protocolVersion();
352 }
353
354 @Override
355 public HttpVersion protocolVersion() {
356 return message.protocolVersion();
357 }
358
359 @Override
360 public FullHttpMessage<R> setProtocolVersion(HttpVersion version) {
361 message.setProtocolVersion(version);
362 return this;
363 }
364
365 @Override
366 public HttpHeaders headers() {
367 return message.headers();
368 }
369
370 @Override
371 public DecoderResult decoderResult() {
372 return message.decoderResult();
373 }
374
375 @Override
376 public void setDecoderResult(DecoderResult result) {
377 message.setDecoderResult(result);
378 }
379 }
380
381 private static final class AggregatedFullHttpRequest extends AggregatedFullHttpMessage<FullHttpRequest>
382 implements FullHttpRequest {
383
384 AggregatedFullHttpRequest(HttpRequest request, Buffer content, HttpHeaders trailingHeaders) {
385 super(request, content, trailingHeaders);
386 }
387
388 @Override
389 public Send<FullHttpRequest> send() {
390 return payload().send().map(FullHttpRequest.class,
391 p -> new AggregatedFullHttpRequest(this, p, trailingHeaders()));
392 }
393
394 @Override
395 public AggregatedFullHttpRequest copy() {
396 return new AggregatedFullHttpRequest(this, payload().copy(), trailingHeaders().copy());
397 }
398
399 @Override
400 public FullHttpRequest touch(Object hint) {
401 payload().touch(hint);
402 return this;
403 }
404
405 @Override
406 public FullHttpRequest setMethod(HttpMethod method) {
407 ((HttpRequest) message).setMethod(method);
408 return this;
409 }
410
411 @Override
412 public FullHttpRequest setUri(String uri) {
413 ((HttpRequest) message).setUri(uri);
414 return this;
415 }
416
417 @Override
418 public HttpMethod method() {
419 return ((HttpRequest) message).method();
420 }
421
422 @Override
423 public String uri() {
424 return ((HttpRequest) message).uri();
425 }
426
427 @Override
428 public FullHttpRequest setProtocolVersion(HttpVersion version) {
429 super.setProtocolVersion(version);
430 return this;
431 }
432
433 @Override
434 public String toString() {
435 return HttpMessageUtil.appendFullRequest(new StringBuilder(256), this).toString();
436 }
437 }
438
439 private static final class AggregatedFullHttpResponse extends AggregatedFullHttpMessage<FullHttpResponse>
440 implements FullHttpResponse {
441
442 AggregatedFullHttpResponse(HttpResponse message, Buffer content, HttpHeaders trailingHeaders) {
443 super(message, content, trailingHeaders);
444 }
445
446 @Override
447 public Send<FullHttpResponse> send() {
448 return payload().send().map(FullHttpResponse.class,
449 p -> new AggregatedFullHttpResponse(this, p, trailingHeaders()));
450 }
451
452 @Override
453 public AggregatedFullHttpResponse copy() {
454 return new AggregatedFullHttpResponse(this, payload().copy(), trailingHeaders().copy());
455 }
456
457 @Override
458 public FullHttpResponse touch(Object hint) {
459 payload().touch(hint);
460 return this;
461 }
462
463 @Override
464 public FullHttpResponse setStatus(HttpResponseStatus status) {
465 ((HttpResponse) message).setStatus(status);
466 return this;
467 }
468
469 @Override
470 public HttpResponseStatus status() {
471 return ((HttpResponse) message).status();
472 }
473
474 @Override
475 public FullHttpResponse setProtocolVersion(HttpVersion version) {
476 super.setProtocolVersion(version);
477 return this;
478 }
479
480 @Override
481 public String toString() {
482 return HttpMessageUtil.appendFullResponse(new StringBuilder(256), this).toString();
483 }
484 }
485 }