View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.http;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  import io.netty5.channel.ChannelHandlerContext;
21  import io.netty5.channel.FileRegion;
22  import io.netty5.handler.codec.MessageToMessageEncoder;
23  import io.netty5.util.CharsetUtil;
24  import io.netty5.util.Resource;
25  import io.netty5.util.internal.StringUtil;
26  
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map.Entry;
30  import java.util.function.Supplier;
31  
32  import static io.netty5.handler.codec.http.HttpConstants.CR;
33  import static io.netty5.handler.codec.http.HttpConstants.LF;
34  
35  /**
36   * Encodes an {@link HttpMessage} or an {@link HttpContent} into
37   * a {@link Buffer}.
38   *
39   * <h3>Extensibility</h3>
40   *
41   * Please note that this encoder is designed to be extended to implement
42   * a protocol derived from HTTP, such as
43   * <a href="https://en.wikipedia.org/wiki/Real_Time_Streaming_Protocol">RTSP</a> and
44   * <a href="https://en.wikipedia.org/wiki/Internet_Content_Adaptation_Protocol">ICAP</a>.
45   * To implement the encoder of such a derived protocol, extend this class and
46   * implement all abstract methods properly.
47   */
48  public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageToMessageEncoder<Object> {
49      static final short CRLF_SHORT = (CR << 8) | LF;
50      private static final int ZERO_CRLF_MEDIUM = ('0' << 16) | CRLF_SHORT;
51      private static final byte[] CRLF = {CR, LF};
52      private static final byte[] ZERO_CRLF_CRLF = { '0', CR, LF, CR, LF };
53      private static final float HEADERS_WEIGHT_NEW = 1 / 5f;
54      private static final float HEADERS_WEIGHT_HISTORICAL = 1 - HEADERS_WEIGHT_NEW;
55      private static final float TRAILERS_WEIGHT_NEW = HEADERS_WEIGHT_NEW;
56      private static final float TRAILERS_WEIGHT_HISTORICAL = HEADERS_WEIGHT_HISTORICAL;
57  
58      private static final int ST_INIT = 0;
59      private static final int ST_CONTENT_NON_CHUNK = 1;
60      private static final int ST_CONTENT_CHUNK = 2;
61      private static final int ST_CONTENT_ALWAYS_EMPTY = 3;
62  
63      private Supplier<Buffer> crlfBufferSupplier;
64      private Supplier<Buffer> zeroCrlfCrlfBufferSupplier;
65  
66      @SuppressWarnings("RedundantFieldInitialization")
67      private int state = ST_INIT;
68  
69      /**
70       * Used to calculate an exponential moving average of the encoded size of the initial line and the headers for
71       * a guess for future buffer allocations.
72       */
73      private float headersEncodedSizeAccumulator = 256;
74  
75      /**
76       * Used to calculate an exponential moving average of the encoded size of the trailers for
77       * a guess for future buffer allocations.
78       */
79      private float trailersEncodedSizeAccumulator = 256;
80  
81      @Override
82      protected void encodeAndClose(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
83          Buffer buf = null;
84          if (msg instanceof HttpMessage) {
85              if (state != ST_INIT) {
86                  throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg)
87                          + ", state: " + state);
88              }
89  
90              @SuppressWarnings({ "unchecked", "CastConflictsWithInstanceof" })
91              H m = (H) msg;
92  
93              buf = ctx.bufferAllocator().allocate((int) headersEncodedSizeAccumulator);
94              // Encode the message.
95              encodeInitialLine(buf, m);
96              state = isContentAlwaysEmpty(m) ? ST_CONTENT_ALWAYS_EMPTY :
97                      HttpUtil.isTransferEncodingChunked(m) ? ST_CONTENT_CHUNK : ST_CONTENT_NON_CHUNK;
98  
99              sanitizeHeadersBeforeEncode(m, state == ST_CONTENT_ALWAYS_EMPTY);
100 
101             encodeHeaders(m.headers(), buf);
102             buf.writeShort(CRLF_SHORT);
103 
104             headersEncodedSizeAccumulator = HEADERS_WEIGHT_NEW * padSizeForAccumulation(buf.readableBytes()) +
105                                             HEADERS_WEIGHT_HISTORICAL * headersEncodedSizeAccumulator;
106         }
107 
108         // Bypass the encoder in case of an empty buffer, so that the following idiom works:
109         //
110         //     ch.write(ctx.bufferAllocator().allocate(0)).addListener(ch, ChannelFutureListeners.CLOSE);
111         //
112         // See https://github.com/netty/netty/issues/2983 for more information.
113         if (msg instanceof Buffer && ((Buffer) msg).readableBytes() == 0) {
114             out.add(msg);
115             return;
116         }
117 
118         if (msg instanceof HttpContent || msg instanceof Buffer || msg instanceof FileRegion) {
119             switch (state) {
120                 case ST_INIT:
121                     throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg)
122                         + ", state: " + state);
123                 case ST_CONTENT_NON_CHUNK:
124                     final long contentLength = contentLength(msg);
125                     if (contentLength > 0) {
126                         if (buf != null && buf.writableBytes() >= contentLength && msg instanceof HttpContent) {
127                             // merge into other buffer for performance reasons
128                             buf.writeBytes(((HttpContent<?>) msg).payload());
129                             Resource.dispose(msg);
130                             out.add(buf);
131                         } else {
132                             if (buf != null) {
133                                 out.add(buf);
134                             }
135                             out.add(encode(msg));
136                         }
137 
138                         if (msg instanceof LastHttpContent) {
139                             state = ST_INIT;
140                         }
141 
142                         break;
143                     }
144 
145                     // fall-through!
146                 case ST_CONTENT_ALWAYS_EMPTY:
147 
148                     if (buf != null) {
149                         // We allocated a buffer so add it now.
150                         out.add(buf);
151                     } else {
152                         // Need to produce some output otherwise an IllegalStateException will be thrown as we did not
153                         // write anything. Writing an empty buffer will not actually write anything on the wire, so if
154                         // there is a user error with msg it will not be visible externally
155                         out.add(ctx.bufferAllocator().allocate(0));
156                     }
157 
158                     break;
159                 case ST_CONTENT_CHUNK:
160                     if (buf != null) {
161                         // We allocated a buffer so add it now.
162                         out.add(buf);
163                     }
164                     encodeChunkedContent(ctx, msg, contentLength(msg), out);
165 
166                     break;
167                 default:
168                     throw new Error();
169             }
170 
171             if (msg instanceof LastHttpContent) {
172                 state = ST_INIT;
173             }
174         } else if (buf != null) {
175             out.add(buf);
176         }
177     }
178 
179     /**
180      * Encode the {@link HttpHeaders} into a {@link Buffer}.
181      */
182     protected void encodeHeaders(HttpHeaders headers, Buffer buf) {
183         Iterator<Entry<CharSequence, CharSequence>> iter = headers.iteratorCharSequence();
184         while (iter.hasNext()) {
185             Entry<CharSequence, CharSequence> header = iter.next();
186             HttpHeadersEncoder.encoderHeader(header.getKey(), header.getValue(), buf);
187         }
188     }
189 
190     private void encodeChunkedContent(ChannelHandlerContext ctx, Object msg, long contentLength, List<Object> out) {
191         if (contentLength > 0) {
192             String lengthHex = Long.toHexString(contentLength);
193             Buffer buf = ctx.bufferAllocator().allocate(lengthHex.length() + 2);
194             buf.writeCharSequence(lengthHex, CharsetUtil.US_ASCII);
195             buf.writeShort(CRLF_SHORT);
196             out.add(buf);
197             out.add(encode(msg));
198             out.add(crlfBuffer(ctx.bufferAllocator()));
199         }
200 
201         if (msg instanceof LastHttpContent) {
202             HttpHeaders headers = ((LastHttpContent<?>) msg).trailingHeaders();
203             if (headers.isEmpty()) {
204                 out.add(zeroCrlfCrlfBuffer(ctx.bufferAllocator()));
205             } else {
206                 Buffer buf = ctx.bufferAllocator().allocate((int) trailersEncodedSizeAccumulator);
207                 buf.writeMedium(ZERO_CRLF_MEDIUM);
208                 encodeHeaders(headers, buf);
209                 buf.writeShort(CRLF_SHORT);
210                 trailersEncodedSizeAccumulator = TRAILERS_WEIGHT_NEW * padSizeForAccumulation(buf.readableBytes()) +
211                                                  TRAILERS_WEIGHT_HISTORICAL * trailersEncodedSizeAccumulator;
212                 out.add(buf);
213             }
214         } else if (contentLength == 0) {
215             // Need to produce some output otherwise an
216             // IllegalStateException will be thrown
217             out.add(encode(msg));
218         }
219     }
220 
221     /**
222      * Allows to sanitize headers of the message before encoding these.
223      */
224     protected void sanitizeHeadersBeforeEncode(@SuppressWarnings("unused") H msg, boolean isAlwaysEmpty) {
225         // noop
226     }
227 
228     /**
229      * Determine whether a message has a content or not. Some message may have headers indicating
230      * a content without having an actual content, e.g the response to an HEAD or CONNECT request.
231      *
232      * @param msg the message to test
233      * @return {@code true} to signal the message has no content
234      */
235     protected boolean isContentAlwaysEmpty(@SuppressWarnings("unused") H msg) {
236         return false;
237     }
238 
239     @Override
240     public boolean acceptOutboundMessage(Object msg) throws Exception {
241         return msg instanceof HttpObject || msg instanceof Buffer || msg instanceof FileRegion;
242     }
243 
244     private static Object encode(Object msg) {
245         if (msg instanceof Buffer) {
246             return msg;
247         }
248         if (msg instanceof HttpContent) {
249             return ((HttpContent<?>) msg).payload();
250         }
251         if (msg instanceof FileRegion) {
252             return msg;
253         }
254         Resource.dispose(msg);
255         throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg));
256     }
257 
258     private static long contentLength(Object msg) {
259         if (msg instanceof HttpContent) {
260             return ((HttpContent<?>) msg).payload().readableBytes();
261         }
262         if (msg instanceof Buffer) {
263             return ((Buffer) msg).readableBytes();
264         }
265         if (msg instanceof FileRegion) {
266             return ((FileRegion) msg).count();
267         }
268         Resource.dispose(msg);
269         throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg));
270     }
271 
272     /**
273      * Add some additional overhead to the buffer. The rational is that it is better to slightly over allocate and waste
274      * some memory, rather than under allocate and require a resize/copy.
275      * @param readableBytes The readable bytes in the buffer.
276      * @return The {@code readableBytes} with some additional padding.
277      */
278     private static int padSizeForAccumulation(int readableBytes) {
279         return (readableBytes << 2) / 3;
280     }
281 
282     protected abstract void encodeInitialLine(Buffer buf, H message) throws Exception;
283 
284     protected Buffer crlfBuffer(BufferAllocator allocator) {
285         if (crlfBufferSupplier == null) {
286             crlfBufferSupplier = allocator.constBufferSupplier(CRLF);
287         }
288         return crlfBufferSupplier.get();
289     }
290 
291     protected Buffer zeroCrlfCrlfBuffer(BufferAllocator allocator) {
292         if (zeroCrlfCrlfBufferSupplier == null) {
293             zeroCrlfCrlfBufferSupplier = allocator.constBufferSupplier(ZERO_CRLF_CRLF);
294         }
295         return zeroCrlfCrlfBufferSupplier.get();
296     }
297 }