1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufUtil;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelPromise;
23 import io.netty.channel.FileRegion;
24 import io.netty.handler.codec.EncoderException;
25 import io.netty.handler.codec.MessageToMessageEncoder;
26 import io.netty.util.CharsetUtil;
27 import io.netty.util.LeakPresenceDetector;
28 import io.netty.util.ReferenceCountUtil;
29 import io.netty.util.concurrent.PromiseCombiner;
30 import io.netty.util.internal.StringUtil;
31
32 import java.util.ArrayList;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map.Entry;
36
37 import static io.netty.buffer.Unpooled.directBuffer;
38 import static io.netty.buffer.Unpooled.unreleasableBuffer;
39 import static io.netty.handler.codec.http.HttpConstants.CR;
40 import static io.netty.handler.codec.http.HttpConstants.LF;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageToMessageEncoder<Object> {
56
57
58 private static final int COPY_CONTENT_THRESHOLD = 128;
59 static final int CRLF_SHORT = (CR << 8) | LF;
60 private static final int ZERO_CRLF_MEDIUM = ('0' << 16) | CRLF_SHORT;
61 private static final byte[] ZERO_CRLF_CRLF = { '0', CR, LF, CR, LF };
62 private static final ByteBuf CRLF_BUF = LeakPresenceDetector.staticInitializer(() -> unreleasableBuffer(
63 directBuffer(2).writeByte(CR).writeByte(LF)).asReadOnly());
64 private static final ByteBuf ZERO_CRLF_CRLF_BUF = LeakPresenceDetector.staticInitializer(() -> unreleasableBuffer(
65 directBuffer(ZERO_CRLF_CRLF.length).writeBytes(ZERO_CRLF_CRLF)).asReadOnly());
66 private static final float HEADERS_WEIGHT_NEW = 1 / 5f;
67 private static final float HEADERS_WEIGHT_HISTORICAL = 1 - HEADERS_WEIGHT_NEW;
68 private static final float TRAILERS_WEIGHT_NEW = HEADERS_WEIGHT_NEW;
69 private static final float TRAILERS_WEIGHT_HISTORICAL = HEADERS_WEIGHT_HISTORICAL;
70
71 private static final int ST_INIT = 0;
72 private static final int ST_CONTENT_NON_CHUNK = 1;
73 private static final int ST_CONTENT_CHUNK = 2;
74 private static final int ST_CONTENT_ALWAYS_EMPTY = 3;
75
76 @SuppressWarnings("RedundantFieldInitialization")
77 private int state = ST_INIT;
78
79
80
81
82
83 private float headersEncodedSizeAccumulator = 256;
84
85
86
87
88
89 private float trailersEncodedSizeAccumulator = 256;
90
91 private final List<Object> out = new ArrayList<Object>();
92
93 private static boolean checkContentState(int state) {
94 return state == ST_CONTENT_CHUNK || state == ST_CONTENT_NON_CHUNK || state == ST_CONTENT_ALWAYS_EMPTY;
95 }
96
97 public HttpObjectEncoder() {
98 super(Object.class);
99 }
100
101 @Override
102 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
103 try {
104 if (acceptOutboundMessage(msg)) {
105 encode(ctx, msg, out);
106 if (out.isEmpty()) {
107 throw new EncoderException(
108 StringUtil.simpleClassName(this) + " must produce at least one message.");
109 }
110 } else {
111 ctx.write(msg, promise);
112 }
113 } catch (EncoderException e) {
114 throw e;
115 } catch (Throwable t) {
116 throw new EncoderException(t);
117 } finally {
118 writeOutList(ctx, out, promise);
119 }
120 }
121
122 private static void writeOutList(ChannelHandlerContext ctx, List<Object> out, ChannelPromise promise) {
123 final int size = out.size();
124 try {
125 if (size == 1) {
126 ctx.write(out.get(0), promise);
127 } else if (size > 1) {
128
129
130 if (promise == ctx.voidPromise()) {
131 writeVoidPromise(ctx, out);
132 } else {
133 writePromiseCombiner(ctx, out, promise);
134 }
135 }
136 } finally {
137 out.clear();
138 }
139 }
140
141 private static void writeVoidPromise(ChannelHandlerContext ctx, List<Object> out) {
142 final ChannelPromise voidPromise = ctx.voidPromise();
143 for (int i = 0; i < out.size(); i++) {
144 ctx.write(out.get(i), voidPromise);
145 }
146 }
147
148 private static void writePromiseCombiner(ChannelHandlerContext ctx, List<Object> out, ChannelPromise promise) {
149 final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
150 for (int i = 0; i < out.size(); i++) {
151 combiner.add(ctx.write(out.get(i)));
152 }
153 combiner.finish(promise);
154 }
155
156 @Override
157 @SuppressWarnings("ConditionCoveredByFurtherCondition")
158 protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
159
160 if (msg == Unpooled.EMPTY_BUFFER) {
161 out.add(Unpooled.EMPTY_BUFFER);
162 return;
163 }
164
165
166
167
168
169 if (msg instanceof FullHttpMessage) {
170 encodeFullHttpMessage(ctx, msg, out);
171 return;
172 }
173 if (msg instanceof HttpMessage) {
174 final H m;
175 try {
176 m = (H) msg;
177 } catch (Exception rethrow) {
178 ReferenceCountUtil.release(msg);
179 throw rethrow;
180 }
181 if (m instanceof LastHttpContent) {
182 encodeHttpMessageLastContent(ctx, m, out);
183 } else if (m instanceof HttpContent) {
184 encodeHttpMessageNotLastContent(ctx, m, out);
185 } else {
186 encodeJustHttpMessage(ctx, m, out);
187 }
188 } else {
189 encodeNotHttpMessageContentTypes(ctx, msg, out);
190 }
191 }
192
193 private void encodeJustHttpMessage(ChannelHandlerContext ctx, H m, List<Object> out) throws Exception {
194 assert !(m instanceof HttpContent);
195 try {
196 if (state != ST_INIT) {
197 throwUnexpectedMessageTypeEx(m, state);
198 }
199 final ByteBuf buf = encodeInitHttpMessage(ctx, m);
200
201 assert checkContentState(state);
202
203 out.add(buf);
204 } finally {
205 ReferenceCountUtil.release(m);
206 }
207 }
208
209 private void encodeByteBufHttpContent(int state, ChannelHandlerContext ctx, ByteBuf buf, ByteBuf content,
210 HttpHeaders trailingHeaders, List<Object> out) {
211 switch (state) {
212 case ST_CONTENT_NON_CHUNK:
213 if (encodeContentNonChunk(out, buf, content)) {
214 break;
215 }
216
217 case ST_CONTENT_ALWAYS_EMPTY:
218
219 out.add(buf);
220 break;
221 case ST_CONTENT_CHUNK:
222
223 out.add(buf);
224 encodeChunkedHttpContent(ctx, content, trailingHeaders, out);
225 break;
226 default:
227 throw new Error("Unexpected http object encoder state: " + state);
228 }
229 }
230
231 private void encodeHttpMessageNotLastContent(ChannelHandlerContext ctx, H m, List<Object> out) throws Exception {
232 assert m instanceof HttpContent;
233 assert !(m instanceof LastHttpContent);
234 final HttpContent httpContent = (HttpContent) m;
235 try {
236 if (state != ST_INIT) {
237 throwUnexpectedMessageTypeEx(m, state);
238 }
239 final ByteBuf buf = encodeInitHttpMessage(ctx, m);
240
241 assert checkContentState(state);
242
243 encodeByteBufHttpContent(state, ctx, buf, httpContent.content(), null, out);
244 } finally {
245 httpContent.release();
246 }
247 }
248
249 private void encodeHttpMessageLastContent(ChannelHandlerContext ctx, H m, List<Object> out) throws Exception {
250 assert m instanceof LastHttpContent;
251 final LastHttpContent httpContent = (LastHttpContent) m;
252 try {
253 if (state != ST_INIT) {
254 throwUnexpectedMessageTypeEx(m, state);
255 }
256 final ByteBuf buf = encodeInitHttpMessage(ctx, m);
257
258 assert checkContentState(state);
259
260 encodeByteBufHttpContent(state, ctx, buf, httpContent.content(), httpContent.trailingHeaders(), out);
261
262 state = ST_INIT;
263 } finally {
264 httpContent.release();
265 }
266 }
267 @SuppressWarnings("ConditionCoveredByFurtherCondition")
268 private void encodeNotHttpMessageContentTypes(ChannelHandlerContext ctx, Object msg, List<Object> out) {
269 assert !(msg instanceof HttpMessage);
270 if (state == ST_INIT) {
271 try {
272 if (msg instanceof ByteBuf && bypassEncoderIfEmpty((ByteBuf) msg, out)) {
273 return;
274 }
275 throwUnexpectedMessageTypeEx(msg, ST_INIT);
276 } finally {
277 ReferenceCountUtil.release(msg);
278 }
279 }
280 if (msg == LastHttpContent.EMPTY_LAST_CONTENT) {
281 state = encodeEmptyLastHttpContent(state, out);
282 return;
283 }
284 if (msg instanceof LastHttpContent) {
285 encodeLastHttpContent(ctx, (LastHttpContent) msg, out);
286 return;
287 }
288 if (msg instanceof HttpContent) {
289 encodeHttpContent(ctx, (HttpContent) msg, out);
290 return;
291 }
292 if (msg instanceof ByteBuf) {
293 encodeByteBufContent(ctx, (ByteBuf) msg, out);
294 return;
295 }
296 if (msg instanceof FileRegion) {
297 encodeFileRegionContent(ctx, (FileRegion) msg, out);
298 return;
299 }
300 try {
301 throwUnexpectedMessageTypeEx(msg, state);
302 } finally {
303 ReferenceCountUtil.release(msg);
304 }
305 }
306
307 private void encodeFullHttpMessage(ChannelHandlerContext ctx, Object o, List<Object> out)
308 throws Exception {
309 assert o instanceof FullHttpMessage;
310 final FullHttpMessage msg = (FullHttpMessage) o;
311 try {
312 if (state != ST_INIT) {
313 throwUnexpectedMessageTypeEx(o, state);
314 }
315
316 final H m = (H) o;
317
318 final int state = isContentAlwaysEmpty(m) ? ST_CONTENT_ALWAYS_EMPTY :
319 HttpUtil.isTransferEncodingChunked(m) ? ST_CONTENT_CHUNK : ST_CONTENT_NON_CHUNK;
320
321 ByteBuf content = msg.content();
322
323 final boolean accountForContentSize = content.readableBytes() > 0 &&
324 state == ST_CONTENT_NON_CHUNK &&
325
326
327
328 content.readableBytes() <=
329 Math.max(COPY_CONTENT_THRESHOLD, ((int) headersEncodedSizeAccumulator) / 8);
330
331 final int headersAndContentSize = (int) headersEncodedSizeAccumulator +
332 (accountForContentSize? content.readableBytes() : 0);
333 final ByteBuf buf = ctx.alloc().buffer(headersAndContentSize);
334
335 encodeInitialLine(buf, m);
336
337 sanitizeHeadersBeforeEncode(m, state == ST_CONTENT_ALWAYS_EMPTY);
338
339 encodeHeaders(m.headers(), buf);
340 ByteBufUtil.writeShortBE(buf, CRLF_SHORT);
341
342
343 headersEncodedSizeAccumulator = HEADERS_WEIGHT_NEW * padSizeForAccumulation(buf.readableBytes()) +
344 HEADERS_WEIGHT_HISTORICAL * headersEncodedSizeAccumulator;
345
346 encodeByteBufHttpContent(state, ctx, buf, content, msg.trailingHeaders(), out);
347 } finally {
348 msg.release();
349 }
350 }
351
352 private static boolean encodeContentNonChunk(List<Object> out, ByteBuf buf, ByteBuf content) {
353 final int contentLength = content.readableBytes();
354 if (contentLength > 0) {
355 if (buf.maxFastWritableBytes() >= contentLength) {
356
357 buf.writeBytes(content);
358 out.add(buf);
359 } else {
360 out.add(buf);
361 out.add(content.retain());
362 }
363 return true;
364 }
365 return false;
366 }
367
368 private static void throwUnexpectedMessageTypeEx(Object msg, int state) {
369 throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg)
370 + ", state: " + state);
371 }
372
373 private void encodeFileRegionContent(ChannelHandlerContext ctx, FileRegion msg, List<Object> out) {
374 try {
375 assert state != ST_INIT;
376 switch (state) {
377 case ST_CONTENT_NON_CHUNK:
378 if (msg.count() > 0) {
379 out.add(msg.retain());
380 break;
381 }
382
383
384 case ST_CONTENT_ALWAYS_EMPTY:
385
386
387
388
389
390
391
392 out.add(Unpooled.EMPTY_BUFFER);
393 break;
394 case ST_CONTENT_CHUNK:
395 encodedChunkedFileRegionContent(ctx, msg, out);
396 break;
397 default:
398 throw new Error("Unexpected http object encoder state: " + state);
399 }
400 } finally {
401 msg.release();
402 }
403 }
404
405
406
407
408
409
410 private static boolean bypassEncoderIfEmpty(ByteBuf msg, List<Object> out) {
411 if (!msg.isReadable()) {
412 out.add(msg.retain());
413 return true;
414 }
415 return false;
416 }
417
418 private void encodeByteBufContent(ChannelHandlerContext ctx, ByteBuf content, List<Object> out) {
419 try {
420 assert state != ST_INIT;
421 if (bypassEncoderIfEmpty(content, out)) {
422 return;
423 }
424 encodeByteBufAndTrailers(state, ctx, out, content, null);
425 } finally {
426 content.release();
427 }
428 }
429
430 private static int encodeEmptyLastHttpContent(int state, List<Object> out) {
431 assert state != ST_INIT;
432
433 switch (state) {
434 case ST_CONTENT_NON_CHUNK:
435 case ST_CONTENT_ALWAYS_EMPTY:
436 out.add(Unpooled.EMPTY_BUFFER);
437 break;
438 case ST_CONTENT_CHUNK:
439 out.add(ZERO_CRLF_CRLF_BUF.duplicate());
440 break;
441 default:
442 throw new Error("Unexpected http object encoder state: " + state);
443 }
444 return ST_INIT;
445 }
446
447 private void encodeLastHttpContent(ChannelHandlerContext ctx, LastHttpContent msg, List<Object> out) {
448 assert state != ST_INIT;
449 assert !(msg instanceof HttpMessage);
450 try {
451 encodeByteBufAndTrailers(state, ctx, out, msg.content(), msg.trailingHeaders());
452 state = ST_INIT;
453 } finally {
454 msg.release();
455 }
456 }
457
458 private void encodeHttpContent(ChannelHandlerContext ctx, HttpContent msg, List<Object> out) {
459 assert state != ST_INIT;
460 assert !(msg instanceof HttpMessage);
461 assert !(msg instanceof LastHttpContent);
462 try {
463 this.encodeByteBufAndTrailers(state, ctx, out, msg.content(), null);
464 } finally {
465 msg.release();
466 }
467 }
468
469 private void encodeByteBufAndTrailers(int state, ChannelHandlerContext ctx, List<Object> out, ByteBuf content,
470 HttpHeaders trailingHeaders) {
471 switch (state) {
472 case ST_CONTENT_NON_CHUNK:
473 if (content.isReadable()) {
474 out.add(content.retain());
475 break;
476 }
477
478 case ST_CONTENT_ALWAYS_EMPTY:
479 out.add(Unpooled.EMPTY_BUFFER);
480 break;
481 case ST_CONTENT_CHUNK:
482 encodeChunkedHttpContent(ctx, content, trailingHeaders, out);
483 break;
484 default:
485 throw new Error("Unexpected http object encoder state: " + state);
486 }
487 }
488
489 private void encodeChunkedHttpContent(ChannelHandlerContext ctx, ByteBuf content, HttpHeaders trailingHeaders,
490 List<Object> out) {
491 final int contentLength = content.readableBytes();
492 if (contentLength > 0) {
493 addEncodedLengthHex(ctx, contentLength, out);
494 out.add(content.retain());
495 out.add(CRLF_BUF.duplicate());
496 }
497 if (trailingHeaders != null) {
498 encodeTrailingHeaders(ctx, trailingHeaders, out);
499 } else if (contentLength == 0) {
500
501
502 out.add(content.retain());
503 }
504 }
505
506 private void encodeTrailingHeaders(ChannelHandlerContext ctx, HttpHeaders trailingHeaders, List<Object> out) {
507 if (trailingHeaders.isEmpty()) {
508 out.add(ZERO_CRLF_CRLF_BUF.duplicate());
509 } else {
510 ByteBuf buf = ctx.alloc().buffer((int) trailersEncodedSizeAccumulator);
511 ByteBufUtil.writeMediumBE(buf, ZERO_CRLF_MEDIUM);
512 encodeHeaders(trailingHeaders, buf);
513 ByteBufUtil.writeShortBE(buf, CRLF_SHORT);
514 trailersEncodedSizeAccumulator = TRAILERS_WEIGHT_NEW * padSizeForAccumulation(buf.readableBytes()) +
515 TRAILERS_WEIGHT_HISTORICAL * trailersEncodedSizeAccumulator;
516 out.add(buf);
517 }
518 }
519
520 private ByteBuf encodeInitHttpMessage(ChannelHandlerContext ctx, H m) throws Exception {
521 assert state == ST_INIT;
522
523 ByteBuf buf = ctx.alloc().buffer((int) headersEncodedSizeAccumulator);
524
525 encodeInitialLine(buf, m);
526 state = isContentAlwaysEmpty(m) ? ST_CONTENT_ALWAYS_EMPTY :
527 HttpUtil.isTransferEncodingChunked(m) ? ST_CONTENT_CHUNK : ST_CONTENT_NON_CHUNK;
528
529 sanitizeHeadersBeforeEncode(m, state == ST_CONTENT_ALWAYS_EMPTY);
530
531 encodeHeaders(m.headers(), buf);
532 ByteBufUtil.writeShortBE(buf, CRLF_SHORT);
533
534 headersEncodedSizeAccumulator = HEADERS_WEIGHT_NEW * padSizeForAccumulation(buf.readableBytes()) +
535 HEADERS_WEIGHT_HISTORICAL * headersEncodedSizeAccumulator;
536 return buf;
537 }
538
539
540
541
542 protected void encodeHeaders(HttpHeaders headers, ByteBuf buf) {
543 Iterator<Entry<CharSequence, CharSequence>> iter = headers.iteratorCharSequence();
544 while (iter.hasNext()) {
545 Entry<CharSequence, CharSequence> header = iter.next();
546 HttpHeadersEncoder.encoderHeader(header.getKey(), header.getValue(), buf);
547 }
548 }
549
550 private static void encodedChunkedFileRegionContent(ChannelHandlerContext ctx, FileRegion msg, List<Object> out) {
551 final long contentLength = msg.count();
552 if (contentLength > 0) {
553 addEncodedLengthHex(ctx, contentLength, out);
554 out.add(msg.retain());
555 out.add(CRLF_BUF.duplicate());
556 } else if (contentLength == 0) {
557
558
559 out.add(msg.retain());
560 }
561 }
562
563 private static void addEncodedLengthHex(ChannelHandlerContext ctx, long contentLength, List<Object> out) {
564 String lengthHex = Long.toHexString(contentLength);
565 ByteBuf buf = ctx.alloc().buffer(lengthHex.length() + 2);
566 buf.writeCharSequence(lengthHex, CharsetUtil.US_ASCII);
567 ByteBufUtil.writeShortBE(buf, CRLF_SHORT);
568 out.add(buf);
569 }
570
571
572
573
574 protected void sanitizeHeadersBeforeEncode(@SuppressWarnings("unused") H msg, boolean isAlwaysEmpty) {
575
576 }
577
578
579
580
581
582
583
584
585 protected boolean isContentAlwaysEmpty(@SuppressWarnings("unused") H msg) {
586 return false;
587 }
588
589 @Override
590 @SuppressWarnings("ConditionCoveredByFurtherCondition")
591 public boolean acceptOutboundMessage(Object msg) throws Exception {
592 return msg == Unpooled.EMPTY_BUFFER ||
593 msg == LastHttpContent.EMPTY_LAST_CONTENT ||
594 msg instanceof FullHttpMessage ||
595 msg instanceof HttpMessage ||
596 msg instanceof LastHttpContent ||
597 msg instanceof HttpContent ||
598 msg instanceof ByteBuf || msg instanceof FileRegion;
599 }
600
601
602
603
604
605
606
607
608 private static int padSizeForAccumulation(int readableBytes) {
609 return (readableBytes << 2) / 3;
610 }
611
612 @Deprecated
613 protected static void encodeAscii(String s, ByteBuf buf) {
614 buf.writeCharSequence(s, CharsetUtil.US_ASCII);
615 }
616
617 protected abstract void encodeInitialLine(ByteBuf buf, H message) throws Exception;
618 }