1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufHolder;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.embedded.EmbeddedChannel;
22 import io.netty.handler.codec.MessageToMessageCodec;
23 import io.netty.handler.codec.http.HttpHeaders.Names;
24 import io.netty.handler.codec.http.HttpHeaders.Values;
25 import io.netty.util.ReferenceCountUtil;
26
27 import java.util.ArrayDeque;
28 import java.util.List;
29 import java.util.Queue;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpRequest, HttpObject> {
54
55 private enum State {
56 PASS_THROUGH,
57 AWAIT_HEADERS,
58 AWAIT_CONTENT
59 }
60
61 private static final CharSequence ZERO_LENGTH_HEAD = "HEAD";
62 private static final CharSequence ZERO_LENGTH_CONNECT = "CONNECT";
63 private static final int CONTINUE_CODE = HttpResponseStatus.CONTINUE.code();
64
65 private final Queue<CharSequence> acceptEncodingQueue = new ArrayDeque<CharSequence>();
66 private EmbeddedChannel encoder;
67 private State state = State.AWAIT_HEADERS;
68
69 @Override
70 public boolean acceptOutboundMessage(Object msg) throws Exception {
71 return msg instanceof HttpContent || msg instanceof HttpResponse;
72 }
73
74 @Override
75 protected void decode(ChannelHandlerContext ctx, HttpRequest msg, List<Object> out)
76 throws Exception {
77 CharSequence acceptedEncoding = msg.headers().get(Names.ACCEPT_ENCODING);
78 if (acceptedEncoding == null) {
79 acceptedEncoding = Values.IDENTITY;
80 }
81
82 HttpMethod meth = msg.getMethod();
83 if (meth == HttpMethod.HEAD) {
84 acceptedEncoding = ZERO_LENGTH_HEAD;
85 } else if (meth == HttpMethod.CONNECT) {
86 acceptedEncoding = ZERO_LENGTH_CONNECT;
87 }
88
89 acceptEncodingQueue.add(acceptedEncoding);
90 out.add(ReferenceCountUtil.retain(msg));
91 }
92
93 @Override
94 protected void encode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
95 final boolean isFull = msg instanceof HttpResponse && msg instanceof LastHttpContent;
96 switch (state) {
97 case AWAIT_HEADERS: {
98 ensureHeaders(msg);
99 assert encoder == null;
100
101 final HttpResponse res = (HttpResponse) msg;
102 final int code = res.getStatus().code();
103 final CharSequence acceptEncoding;
104 if (code == CONTINUE_CODE) {
105
106
107 acceptEncoding = null;
108 } else {
109
110 acceptEncoding = acceptEncodingQueue.poll();
111 if (acceptEncoding == null) {
112 throw new IllegalStateException("cannot send more responses than requests");
113 }
114 }
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 if (isPassthru(res.getProtocolVersion(), code, acceptEncoding)) {
130 if (isFull) {
131 out.add(ReferenceCountUtil.retain(res));
132 } else {
133 out.add(res);
134
135 state = State.PASS_THROUGH;
136 }
137 break;
138 }
139
140 if (isFull) {
141
142 if (!((ByteBufHolder) res).content().isReadable()) {
143 out.add(ReferenceCountUtil.retain(res));
144 break;
145 }
146 }
147
148
149 final Result result = beginEncode(res, acceptEncoding.toString());
150
151
152 if (result == null) {
153 if (isFull) {
154 out.add(ReferenceCountUtil.retain(res));
155 } else {
156 out.add(res);
157
158 state = State.PASS_THROUGH;
159 }
160 break;
161 }
162
163 encoder = result.contentEncoder();
164
165
166
167 res.headers().set(Names.CONTENT_ENCODING, result.targetContentEncoding());
168
169
170 if (isFull) {
171
172 HttpResponse newRes = new DefaultHttpResponse(res.getProtocolVersion(), res.getStatus());
173 newRes.headers().set(res.headers());
174 out.add(newRes);
175
176 ensureContent(res);
177 encodeFullResponse(newRes, (HttpContent) res, out);
178 break;
179 } else {
180
181 res.headers().remove(Names.CONTENT_LENGTH);
182 res.headers().set(Names.TRANSFER_ENCODING, Values.CHUNKED);
183
184 out.add(res);
185 state = State.AWAIT_CONTENT;
186 if (!(msg instanceof HttpContent)) {
187
188
189 break;
190 }
191
192 }
193 }
194 case AWAIT_CONTENT: {
195 ensureContent(msg);
196 if (encodeContent((HttpContent) msg, out)) {
197 state = State.AWAIT_HEADERS;
198 }
199 break;
200 }
201 case PASS_THROUGH: {
202 ensureContent(msg);
203 out.add(ReferenceCountUtil.retain(msg));
204
205 if (msg instanceof LastHttpContent) {
206 state = State.AWAIT_HEADERS;
207 }
208 break;
209 }
210 }
211 }
212
213 private void encodeFullResponse(HttpResponse newRes, HttpContent content, List<Object> out) {
214 int existingMessages = out.size();
215 encodeContent(content, out);
216
217 if (HttpHeaders.isContentLengthSet(newRes)) {
218
219 int messageSize = 0;
220 for (int i = existingMessages; i < out.size(); i++) {
221 Object item = out.get(i);
222 if (item instanceof HttpContent) {
223 messageSize += ((HttpContent) item).content().readableBytes();
224 }
225 }
226 HttpHeaders.setContentLength(newRes, messageSize);
227 } else {
228 newRes.headers().set(Names.TRANSFER_ENCODING, Values.CHUNKED);
229 }
230 }
231
232 private static boolean isPassthru(HttpVersion version, int code, CharSequence httpMethod) {
233 return code < 200 || code == 204 || code == 304 ||
234 (httpMethod == ZERO_LENGTH_HEAD || (httpMethod == ZERO_LENGTH_CONNECT && code == 200)) ||
235 version == HttpVersion.HTTP_1_0;
236 }
237
238 private static void ensureHeaders(HttpObject msg) {
239 if (!(msg instanceof HttpResponse)) {
240 throw new IllegalStateException(
241 "unexpected message type: " +
242 msg.getClass().getName() + " (expected: " + HttpResponse.class.getSimpleName() + ')');
243 }
244 }
245
246 private static void ensureContent(HttpObject msg) {
247 if (!(msg instanceof HttpContent)) {
248 throw new IllegalStateException(
249 "unexpected message type: " +
250 msg.getClass().getName() + " (expected: " + HttpContent.class.getSimpleName() + ')');
251 }
252 }
253
254 private boolean encodeContent(HttpContent c, List<Object> out) {
255 ByteBuf content = c.content();
256
257 encode(content, out);
258
259 if (c instanceof LastHttpContent) {
260 finishEncode(out);
261 LastHttpContent last = (LastHttpContent) c;
262
263
264
265 HttpHeaders headers = last.trailingHeaders();
266 if (headers.isEmpty()) {
267 out.add(LastHttpContent.EMPTY_LAST_CONTENT);
268 } else {
269 out.add(new ComposedLastHttpContent(headers));
270 }
271 return true;
272 }
273 return false;
274 }
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290 protected abstract Result beginEncode(HttpResponse headers, String acceptEncoding) throws Exception;
291
292 @Override
293 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
294 cleanupSafely(ctx);
295 super.handlerRemoved(ctx);
296 }
297
298 @Override
299 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
300 cleanupSafely(ctx);
301 super.channelInactive(ctx);
302 }
303
304 private void cleanup() {
305 if (encoder != null) {
306
307 encoder.finishAndReleaseAll();
308 encoder = null;
309 }
310 }
311
312 private void cleanupSafely(ChannelHandlerContext ctx) {
313 try {
314 cleanup();
315 } catch (Throwable cause) {
316
317
318 ctx.fireExceptionCaught(cause);
319 }
320 }
321
322 private void encode(ByteBuf in, List<Object> out) {
323
324 encoder.writeOutbound(in.retain());
325 fetchEncoderOutput(out);
326 }
327
328 private void finishEncode(List<Object> out) {
329 if (encoder.finish()) {
330 fetchEncoderOutput(out);
331 }
332 encoder = null;
333 }
334
335 private void fetchEncoderOutput(List<Object> out) {
336 for (;;) {
337 ByteBuf buf = (ByteBuf) encoder.readOutbound();
338 if (buf == null) {
339 break;
340 }
341 if (!buf.isReadable()) {
342 buf.release();
343 continue;
344 }
345 out.add(new DefaultHttpContent(buf));
346 }
347 }
348
349 public static final class Result {
350 private final String targetContentEncoding;
351 private final EmbeddedChannel contentEncoder;
352
353 public Result(String targetContentEncoding, EmbeddedChannel contentEncoder) {
354 if (targetContentEncoding == null) {
355 throw new NullPointerException("targetContentEncoding");
356 }
357 if (contentEncoder == null) {
358 throw new NullPointerException("contentEncoder");
359 }
360
361 this.targetContentEncoding = targetContentEncoding;
362 this.contentEncoder = contentEncoder;
363 }
364
365 public String targetContentEncoding() {
366 return targetContentEncoding;
367 }
368
369 public EmbeddedChannel contentEncoder() {
370 return contentEncoder;
371 }
372 }
373 }