1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package io.netty.handler.codec.http;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufUtil;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.channel.FileRegion;
24  import io.netty.handler.codec.EncoderException;
25  import io.netty.handler.codec.MessageToMessageEncoder;
26  import io.netty.util.CharsetUtil;
27  import io.netty.util.LeakPresenceDetector;
28  import io.netty.util.ReferenceCountUtil;
29  import io.netty.util.concurrent.PromiseCombiner;
30  import io.netty.util.internal.StringUtil;
31  
32  import java.util.ArrayList;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.Map.Entry;
36  
37  import static io.netty.buffer.Unpooled.directBuffer;
38  import static io.netty.buffer.Unpooled.unreleasableBuffer;
39  import static io.netty.handler.codec.http.HttpConstants.CR;
40  import static io.netty.handler.codec.http.HttpConstants.LF;
41  
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageToMessageEncoder<Object> {
56  
57      
58      private static final int COPY_CONTENT_THRESHOLD = 128;
59      static final int CRLF_SHORT = (CR << 8) | LF;
60      private static final int ZERO_CRLF_MEDIUM = ('0' << 16) | CRLF_SHORT;
61      private static final byte[] ZERO_CRLF_CRLF = { '0', CR, LF, CR, LF };
62      private static final ByteBuf CRLF_BUF = LeakPresenceDetector.staticInitializer(() -> unreleasableBuffer(
63              directBuffer(2).writeByte(CR).writeByte(LF)).asReadOnly());
64      private static final ByteBuf ZERO_CRLF_CRLF_BUF = LeakPresenceDetector.staticInitializer(() -> unreleasableBuffer(
65              directBuffer(ZERO_CRLF_CRLF.length).writeBytes(ZERO_CRLF_CRLF)).asReadOnly());
66      private static final float HEADERS_WEIGHT_NEW = 1 / 5f;
67      private static final float HEADERS_WEIGHT_HISTORICAL = 1 - HEADERS_WEIGHT_NEW;
68      private static final float TRAILERS_WEIGHT_NEW = HEADERS_WEIGHT_NEW;
69      private static final float TRAILERS_WEIGHT_HISTORICAL = HEADERS_WEIGHT_HISTORICAL;
70  
71      private static final int ST_INIT = 0;
72      private static final int ST_CONTENT_NON_CHUNK = 1;
73      private static final int ST_CONTENT_CHUNK = 2;
74      private static final int ST_CONTENT_ALWAYS_EMPTY = 3;
75  
76      @SuppressWarnings("RedundantFieldInitialization")
77      private int state = ST_INIT;
78  
79      
80  
81  
82  
83      private float headersEncodedSizeAccumulator = 256;
84  
85      
86  
87  
88  
89      private float trailersEncodedSizeAccumulator = 256;
90  
91      private final List<Object> out = new ArrayList<Object>();
92  
93      private static boolean checkContentState(int state) {
94          return state == ST_CONTENT_CHUNK || state == ST_CONTENT_NON_CHUNK || state == ST_CONTENT_ALWAYS_EMPTY;
95      }
96  
97      public HttpObjectEncoder() {
98          super(Object.class);
99      }
100 
101     @Override
102     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
103         try {
104             if (acceptOutboundMessage(msg)) {
105                 encode(ctx, msg, out);
106                 if (out.isEmpty()) {
107                     throw new EncoderException(
108                             StringUtil.simpleClassName(this) + " must produce at least one message.");
109                 }
110             } else {
111                 ctx.write(msg, promise);
112             }
113         } catch (EncoderException e) {
114             throw e;
115         } catch (Throwable t) {
116             throw new EncoderException(t);
117         } finally {
118             writeOutList(ctx, out, promise);
119         }
120     }
121 
122     private static void writeOutList(ChannelHandlerContext ctx, List<Object> out, ChannelPromise promise) {
123         final int size = out.size();
124         try {
125             if (size == 1) {
126                 ctx.write(out.get(0), promise);
127             } else if (size > 1) {
128                 
129                 
130                 if (promise == ctx.voidPromise()) {
131                     writeVoidPromise(ctx, out);
132                 } else {
133                     writePromiseCombiner(ctx, out, promise);
134                 }
135             }
136         } finally {
137             out.clear();
138         }
139     }
140 
141     private static void writeVoidPromise(ChannelHandlerContext ctx, List<Object> out) {
142         final ChannelPromise voidPromise = ctx.voidPromise();
143         for (int i = 0; i < out.size(); i++) {
144             ctx.write(out.get(i), voidPromise);
145         }
146     }
147 
148     private static void writePromiseCombiner(ChannelHandlerContext ctx, List<Object> out, ChannelPromise promise) {
149         final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
150         for (int i = 0; i < out.size(); i++) {
151             combiner.add(ctx.write(out.get(i)));
152         }
153         combiner.finish(promise);
154     }
155 
156     @Override
157     @SuppressWarnings("ConditionCoveredByFurtherCondition")
158     protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
159         
160         if (msg == Unpooled.EMPTY_BUFFER) {
161             out.add(Unpooled.EMPTY_BUFFER);
162             return;
163         }
164         
165         
166         
167         
168         
169         if (msg instanceof FullHttpMessage) {
170             encodeFullHttpMessage(ctx, msg, out);
171             return;
172         }
173         if (msg instanceof HttpMessage) {
174             final H m;
175             try {
176                 m = (H) msg;
177             } catch (Exception rethrow) {
178                 ReferenceCountUtil.release(msg);
179                 throw rethrow;
180             }
181             if (m instanceof LastHttpContent) {
182                 encodeHttpMessageLastContent(ctx, m, out);
183             } else if (m instanceof HttpContent) {
184                 encodeHttpMessageNotLastContent(ctx, m, out);
185             } else {
186                 encodeJustHttpMessage(ctx, m, out);
187             }
188         } else {
189             encodeNotHttpMessageContentTypes(ctx, msg, out);
190         }
191     }
192 
193     private void encodeJustHttpMessage(ChannelHandlerContext ctx, H m, List<Object> out) throws Exception {
194         assert !(m instanceof HttpContent);
195         try {
196             if (state != ST_INIT) {
197                 throwUnexpectedMessageTypeEx(m, state);
198             }
199             final ByteBuf buf = encodeInitHttpMessage(ctx, m);
200 
201             assert checkContentState(state);
202 
203             out.add(buf);
204         } finally {
205             ReferenceCountUtil.release(m);
206         }
207     }
208 
209     private void encodeByteBufHttpContent(int state, ChannelHandlerContext ctx, ByteBuf buf, ByteBuf content,
210                                           HttpHeaders trailingHeaders, List<Object> out) {
211         switch (state) {
212             case ST_CONTENT_NON_CHUNK:
213                 if (encodeContentNonChunk(out, buf, content)) {
214                     break;
215                 }
216                 
217             case ST_CONTENT_ALWAYS_EMPTY:
218                 
219                 out.add(buf);
220                 break;
221             case ST_CONTENT_CHUNK:
222                 
223                 out.add(buf);
224                 encodeChunkedHttpContent(ctx, content, trailingHeaders, out);
225                 break;
226             default:
227                 throw new Error("Unexpected http object encoder state: " + state);
228         }
229     }
230 
231     private void encodeHttpMessageNotLastContent(ChannelHandlerContext ctx, H m, List<Object> out) throws Exception {
232         assert m instanceof HttpContent;
233         assert !(m instanceof LastHttpContent);
234         final HttpContent httpContent = (HttpContent) m;
235         try {
236             if (state != ST_INIT) {
237                 throwUnexpectedMessageTypeEx(m, state);
238             }
239             final ByteBuf buf = encodeInitHttpMessage(ctx, m);
240 
241             assert checkContentState(state);
242 
243             encodeByteBufHttpContent(state, ctx, buf, httpContent.content(), null, out);
244         } finally {
245             httpContent.release();
246         }
247     }
248 
249     private void encodeHttpMessageLastContent(ChannelHandlerContext ctx, H m, List<Object> out) throws Exception {
250         assert m instanceof LastHttpContent;
251         final LastHttpContent httpContent = (LastHttpContent) m;
252         try {
253             if (state != ST_INIT) {
254                 throwUnexpectedMessageTypeEx(m, state);
255             }
256             final ByteBuf buf = encodeInitHttpMessage(ctx, m);
257 
258             assert checkContentState(state);
259 
260             encodeByteBufHttpContent(state, ctx, buf, httpContent.content(), httpContent.trailingHeaders(), out);
261 
262             state = ST_INIT;
263         } finally {
264             httpContent.release();
265         }
266     }
267     @SuppressWarnings("ConditionCoveredByFurtherCondition")
268     private void encodeNotHttpMessageContentTypes(ChannelHandlerContext ctx, Object msg, List<Object> out) {
269         assert !(msg instanceof HttpMessage);
270         if (state == ST_INIT) {
271             try {
272                 if (msg instanceof ByteBuf && bypassEncoderIfEmpty((ByteBuf) msg, out)) {
273                     return;
274                 }
275                 throwUnexpectedMessageTypeEx(msg, ST_INIT);
276             } finally {
277                 ReferenceCountUtil.release(msg);
278             }
279         }
280         if (msg == LastHttpContent.EMPTY_LAST_CONTENT) {
281             state = encodeEmptyLastHttpContent(state, out);
282             return;
283         }
284         if (msg instanceof LastHttpContent) {
285             encodeLastHttpContent(ctx, (LastHttpContent) msg, out);
286             return;
287         }
288         if (msg instanceof HttpContent) {
289             encodeHttpContent(ctx, (HttpContent) msg, out);
290             return;
291         }
292         if (msg instanceof ByteBuf) {
293             encodeByteBufContent(ctx, (ByteBuf) msg, out);
294             return;
295         }
296         if (msg instanceof FileRegion) {
297             encodeFileRegionContent(ctx, (FileRegion) msg, out);
298             return;
299         }
300         try {
301             throwUnexpectedMessageTypeEx(msg, state);
302         } finally {
303             ReferenceCountUtil.release(msg);
304         }
305     }
306 
307     private void encodeFullHttpMessage(ChannelHandlerContext ctx, Object o, List<Object> out)
308             throws Exception {
309         assert o instanceof FullHttpMessage;
310         final FullHttpMessage msg = (FullHttpMessage) o;
311         try {
312             if (state != ST_INIT) {
313                 throwUnexpectedMessageTypeEx(o, state);
314             }
315 
316             final H m = (H) o;
317 
318             final int state = isContentAlwaysEmpty(m) ? ST_CONTENT_ALWAYS_EMPTY :
319                     HttpUtil.isTransferEncodingChunked(m) ? ST_CONTENT_CHUNK : ST_CONTENT_NON_CHUNK;
320 
321             ByteBuf content = msg.content();
322 
323             final boolean accountForContentSize = content.readableBytes() > 0 &&
324                                         state == ST_CONTENT_NON_CHUNK &&
325                                         
326                                         
327                                         
328                                         content.readableBytes() <=
329                                         Math.max(COPY_CONTENT_THRESHOLD, ((int) headersEncodedSizeAccumulator) / 8);
330 
331             final int headersAndContentSize = (int) headersEncodedSizeAccumulator +
332                                                   (accountForContentSize? content.readableBytes() : 0);
333             final ByteBuf buf = ctx.alloc().buffer(headersAndContentSize);
334 
335             encodeInitialLine(buf, m);
336 
337             sanitizeHeadersBeforeEncode(m, state == ST_CONTENT_ALWAYS_EMPTY);
338 
339             encodeHeaders(m.headers(), buf);
340             ByteBufUtil.writeShortBE(buf, CRLF_SHORT);
341 
342             
343             headersEncodedSizeAccumulator = HEADERS_WEIGHT_NEW * padSizeForAccumulation(buf.readableBytes()) +
344                     HEADERS_WEIGHT_HISTORICAL * headersEncodedSizeAccumulator;
345 
346             encodeByteBufHttpContent(state, ctx, buf, content, msg.trailingHeaders(), out);
347         } finally {
348             msg.release();
349         }
350     }
351 
352     private static boolean encodeContentNonChunk(List<Object> out, ByteBuf buf, ByteBuf content) {
353         final int contentLength = content.readableBytes();
354         if (contentLength > 0) {
355             if (buf.maxFastWritableBytes() >= contentLength) {
356                 
357                 buf.writeBytes(content);
358                 out.add(buf);
359             } else {
360                 out.add(buf);
361                 out.add(content.retain());
362             }
363             return true;
364         }
365         return false;
366     }
367 
368     private static void throwUnexpectedMessageTypeEx(Object msg, int state) {
369         throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg)
370                 + ", state: " + state);
371     }
372 
373     private void encodeFileRegionContent(ChannelHandlerContext ctx, FileRegion msg, List<Object> out) {
374         try {
375             assert state != ST_INIT;
376             switch (state) {
377                 case ST_CONTENT_NON_CHUNK:
378                     if (msg.count() > 0) {
379                         out.add(msg.retain());
380                         break;
381                     }
382 
383                     
384                 case ST_CONTENT_ALWAYS_EMPTY:
385                     
386                     
387                     
388                     
389                     
390                     
391                     
392                     out.add(Unpooled.EMPTY_BUFFER);
393                     break;
394                 case ST_CONTENT_CHUNK:
395                     encodedChunkedFileRegionContent(ctx, msg, out);
396                     break;
397                 default:
398                     throw new Error("Unexpected http object encoder state: " + state);
399             }
400         } finally {
401             msg.release();
402         }
403     }
404 
405     
406     
407     
408     
409     
410     private static boolean bypassEncoderIfEmpty(ByteBuf msg, List<Object> out) {
411         if (!msg.isReadable()) {
412             out.add(msg.retain());
413             return true;
414         }
415         return false;
416     }
417 
418     private void encodeByteBufContent(ChannelHandlerContext ctx, ByteBuf content, List<Object> out) {
419         try {
420             assert state != ST_INIT;
421             if (bypassEncoderIfEmpty(content, out)) {
422                 return;
423             }
424             encodeByteBufAndTrailers(state, ctx, out, content, null);
425         } finally {
426             content.release();
427         }
428     }
429 
430     private static int encodeEmptyLastHttpContent(int state, List<Object> out) {
431         assert state != ST_INIT;
432 
433         switch (state) {
434             case ST_CONTENT_NON_CHUNK:
435             case ST_CONTENT_ALWAYS_EMPTY:
436                 out.add(Unpooled.EMPTY_BUFFER);
437                 break;
438             case ST_CONTENT_CHUNK:
439                 out.add(ZERO_CRLF_CRLF_BUF.duplicate());
440                 break;
441             default:
442                 throw new Error("Unexpected http object encoder state: " + state);
443         }
444         return ST_INIT;
445     }
446 
447     private void encodeLastHttpContent(ChannelHandlerContext ctx, LastHttpContent msg, List<Object> out) {
448         assert state != ST_INIT;
449         assert !(msg instanceof HttpMessage);
450         try {
451             encodeByteBufAndTrailers(state, ctx, out, msg.content(), msg.trailingHeaders());
452             state = ST_INIT;
453         } finally {
454             msg.release();
455         }
456     }
457 
458     private void encodeHttpContent(ChannelHandlerContext ctx, HttpContent msg, List<Object> out) {
459         assert state != ST_INIT;
460         assert !(msg instanceof HttpMessage);
461         assert !(msg instanceof LastHttpContent);
462         try {
463             this.encodeByteBufAndTrailers(state, ctx, out, msg.content(), null);
464         } finally {
465             msg.release();
466         }
467     }
468 
469     private void encodeByteBufAndTrailers(int state, ChannelHandlerContext ctx, List<Object> out, ByteBuf content,
470                                           HttpHeaders trailingHeaders) {
471         switch (state) {
472             case ST_CONTENT_NON_CHUNK:
473                 if (content.isReadable()) {
474                     out.add(content.retain());
475                     break;
476                 }
477                 
478             case ST_CONTENT_ALWAYS_EMPTY:
479                 out.add(Unpooled.EMPTY_BUFFER);
480                 break;
481             case ST_CONTENT_CHUNK:
482                 encodeChunkedHttpContent(ctx, content, trailingHeaders, out);
483                 break;
484             default:
485                 throw new Error("Unexpected http object encoder state: " + state);
486         }
487     }
488 
489     private void encodeChunkedHttpContent(ChannelHandlerContext ctx, ByteBuf content, HttpHeaders trailingHeaders,
490                                           List<Object> out) {
491         final int contentLength = content.readableBytes();
492         if (contentLength > 0) {
493             addEncodedLengthHex(ctx, contentLength, out);
494             out.add(content.retain());
495             out.add(CRLF_BUF.duplicate());
496         }
497         if (trailingHeaders != null) {
498             encodeTrailingHeaders(ctx, trailingHeaders, out);
499         } else if (contentLength == 0) {
500             
501             
502             out.add(content.retain());
503         }
504     }
505 
506     private void encodeTrailingHeaders(ChannelHandlerContext ctx, HttpHeaders trailingHeaders, List<Object> out) {
507         if (trailingHeaders.isEmpty()) {
508             out.add(ZERO_CRLF_CRLF_BUF.duplicate());
509         } else {
510             ByteBuf buf = ctx.alloc().buffer((int) trailersEncodedSizeAccumulator);
511             ByteBufUtil.writeMediumBE(buf, ZERO_CRLF_MEDIUM);
512             encodeHeaders(trailingHeaders, buf);
513             ByteBufUtil.writeShortBE(buf, CRLF_SHORT);
514             trailersEncodedSizeAccumulator = TRAILERS_WEIGHT_NEW * padSizeForAccumulation(buf.readableBytes()) +
515                     TRAILERS_WEIGHT_HISTORICAL * trailersEncodedSizeAccumulator;
516             out.add(buf);
517         }
518     }
519 
520     private ByteBuf encodeInitHttpMessage(ChannelHandlerContext ctx, H m) throws Exception {
521         assert state == ST_INIT;
522 
523         ByteBuf buf = ctx.alloc().buffer((int) headersEncodedSizeAccumulator);
524         
525         encodeInitialLine(buf, m);
526         state = isContentAlwaysEmpty(m) ? ST_CONTENT_ALWAYS_EMPTY :
527                 HttpUtil.isTransferEncodingChunked(m) ? ST_CONTENT_CHUNK : ST_CONTENT_NON_CHUNK;
528 
529         sanitizeHeadersBeforeEncode(m, state == ST_CONTENT_ALWAYS_EMPTY);
530 
531         encodeHeaders(m.headers(), buf);
532         ByteBufUtil.writeShortBE(buf, CRLF_SHORT);
533 
534         headersEncodedSizeAccumulator = HEADERS_WEIGHT_NEW * padSizeForAccumulation(buf.readableBytes()) +
535                 HEADERS_WEIGHT_HISTORICAL * headersEncodedSizeAccumulator;
536         return buf;
537     }
538 
539     
540 
541 
542     protected void encodeHeaders(HttpHeaders headers, ByteBuf buf) {
543         Iterator<Entry<CharSequence, CharSequence>> iter = headers.iteratorCharSequence();
544         while (iter.hasNext()) {
545             Entry<CharSequence, CharSequence> header = iter.next();
546             HttpHeadersEncoder.encoderHeader(header.getKey(), header.getValue(), buf);
547         }
548     }
549 
550     private static void encodedChunkedFileRegionContent(ChannelHandlerContext ctx, FileRegion msg, List<Object> out) {
551         final long contentLength = msg.count();
552         if (contentLength > 0) {
553             addEncodedLengthHex(ctx, contentLength, out);
554             out.add(msg.retain());
555             out.add(CRLF_BUF.duplicate());
556         } else if (contentLength == 0) {
557             
558             
559             out.add(msg.retain());
560         }
561     }
562 
563     private static void addEncodedLengthHex(ChannelHandlerContext ctx, long contentLength, List<Object> out) {
564         String lengthHex = Long.toHexString(contentLength);
565         ByteBuf buf = ctx.alloc().buffer(lengthHex.length() + 2);
566         buf.writeCharSequence(lengthHex, CharsetUtil.US_ASCII);
567         ByteBufUtil.writeShortBE(buf, CRLF_SHORT);
568         out.add(buf);
569     }
570 
571     
572 
573 
574     protected void sanitizeHeadersBeforeEncode(@SuppressWarnings("unused") H msg, boolean isAlwaysEmpty) {
575         
576     }
577 
578     
579 
580 
581 
582 
583 
584 
585     protected boolean isContentAlwaysEmpty(@SuppressWarnings("unused") H msg) {
586         return false;
587     }
588 
589     @Override
590     @SuppressWarnings("ConditionCoveredByFurtherCondition")
591     public boolean acceptOutboundMessage(Object msg) throws Exception {
592         return msg == Unpooled.EMPTY_BUFFER ||
593                 msg == LastHttpContent.EMPTY_LAST_CONTENT ||
594                 msg instanceof FullHttpMessage ||
595                 msg instanceof HttpMessage ||
596                 msg instanceof LastHttpContent ||
597                 msg instanceof HttpContent ||
598                 msg instanceof ByteBuf || msg instanceof FileRegion;
599     }
600 
601     
602 
603 
604 
605 
606 
607 
608     private static int padSizeForAccumulation(int readableBytes) {
609         return (readableBytes << 2) / 3;
610     }
611 
612     @Deprecated
613     protected static void encodeAscii(String s, ByteBuf buf) {
614         buf.writeCharSequence(s, CharsetUtil.US_ASCII);
615     }
616 
617     protected abstract void encodeInitialLine(ByteBuf buf, H message) throws Exception;
618 }