1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.http;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20 import io.netty5.channel.ChannelHandlerContext;
21 import io.netty5.channel.embedded.EmbeddedChannel;
22 import io.netty5.handler.codec.MessageToMessageCodec;
23 import io.netty5.handler.codec.compression.Compressor;
24 import io.netty5.util.Resource;
25 import io.netty5.util.internal.StringUtil;
26
27 import java.util.ArrayDeque;
28 import java.util.List;
29 import java.util.Queue;
30
31 import static io.netty5.handler.codec.http.HttpHeaderNames.ACCEPT_ENCODING;
32 import static java.util.Objects.requireNonNull;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpRequest, HttpObject> {
57
58 private enum State {
59 PASS_THROUGH,
60 AWAIT_HEADERS,
61 AWAIT_CONTENT
62 }
63
64 private static final CharSequence ZERO_LENGTH_HEAD = "HEAD";
65 private static final CharSequence ZERO_LENGTH_CONNECT = "CONNECT";
66 private static final int CONTINUE_CODE = HttpResponseStatus.CONTINUE.code();
67
68 private final Queue<CharSequence> acceptEncodingQueue = new ArrayDeque<>();
69 private Compressor compressor;
70 private State state = State.AWAIT_HEADERS;
71
72 @Override
73 public boolean acceptOutboundMessage(Object msg) throws Exception {
74 return msg instanceof HttpContent || msg instanceof HttpResponse;
75 }
76
77 @Override
78 protected void decode(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
79 throw new UnsupportedOperationException("HttpContentEncoder use decodeAndClose().");
80 }
81
82 @Override
83 protected void decodeAndClose(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
84 CharSequence acceptEncoding;
85 List<String> acceptEncodingHeaders = msg.headers().getAll(ACCEPT_ENCODING);
86 switch (acceptEncodingHeaders.size()) {
87 case 0:
88 acceptEncoding = HttpContentDecoder.IDENTITY;
89 break;
90 case 1:
91 acceptEncoding = acceptEncodingHeaders.get(0);
92 break;
93 default:
94
95 acceptEncoding = StringUtil.join(",", acceptEncodingHeaders);
96 break;
97 }
98
99 HttpMethod method = msg.method();
100 if (HttpMethod.HEAD.equals(method)) {
101 acceptEncoding = ZERO_LENGTH_HEAD;
102 } else if (HttpMethod.CONNECT.equals(method)) {
103 acceptEncoding = ZERO_LENGTH_CONNECT;
104 }
105
106 acceptEncodingQueue.add(acceptEncoding);
107 ctx.fireChannelRead(msg);
108 }
109
110 @Override
111 protected void encodeAndClose(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
112 final boolean isFull = msg instanceof HttpResponse && msg instanceof LastHttpContent;
113 boolean dispose = true;
114 switch (state) {
115 case AWAIT_HEADERS: {
116 ensureHeaders(msg);
117 assert compressor == null;
118 assert msg instanceof HttpResponse;
119
120 final HttpResponse res = (HttpResponse) msg;
121 final int code = res.status().code();
122 final CharSequence acceptEncoding;
123 if (code == CONTINUE_CODE) {
124
125
126 acceptEncoding = null;
127 } else {
128
129 acceptEncoding = acceptEncodingQueue.poll();
130 if (acceptEncoding == null) {
131 throw new IllegalStateException("cannot send more responses than requests");
132 }
133 }
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148 if (isPassthru(res.protocolVersion(), code, acceptEncoding)) {
149 out.add(res);
150 dispose = false;
151 if (!isFull) {
152
153 state = State.PASS_THROUGH;
154 }
155 break;
156 }
157
158
159 if (isFull && ((LastHttpContent<?>) res).payload().readableBytes() == 0) {
160 out.add(res);
161 dispose = false;
162 break;
163 }
164
165
166 assert acceptEncoding != null;
167 final Result result = beginEncode(res, acceptEncoding.toString());
168
169
170 if (result == null) {
171 out.add(res);
172 dispose = false;
173 if (!isFull) {
174
175 state = State.PASS_THROUGH;
176 }
177 break;
178 }
179
180 compressor = result.contentCompressor();
181
182
183
184 res.headers().set(HttpHeaderNames.CONTENT_ENCODING, result.targetContentEncoding());
185
186
187 if (isFull) {
188
189 HttpResponse newRes = new DefaultHttpResponse(res.protocolVersion(), res.status());
190 newRes.headers().set(res.headers());
191 out.add(newRes);
192
193 ensureContent(res);
194 encodeFullResponse(ctx, newRes, (HttpContent<?>) res, out);
195 break;
196 } else {
197
198 res.headers().remove(HttpHeaderNames.CONTENT_LENGTH);
199 res.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
200
201 out.add(res);
202 dispose = false;
203 state = State.AWAIT_CONTENT;
204 if (!(msg instanceof HttpContent)) {
205
206
207 break;
208 }
209
210 }
211 }
212 case AWAIT_CONTENT: {
213 ensureContent(msg);
214 if (encodeContent(ctx, (HttpContent<?>) msg, out)) {
215 state = State.AWAIT_HEADERS;
216 }
217 break;
218 }
219 case PASS_THROUGH: {
220 ensureContent(msg);
221 out.add(msg);
222 dispose = false;
223
224 if (msg instanceof LastHttpContent) {
225 state = State.AWAIT_HEADERS;
226 }
227 break;
228 }
229 }
230 if (dispose) {
231 Resource.dispose(msg);
232 }
233 }
234
235 private void encodeFullResponse(ChannelHandlerContext ctx, HttpResponse newRes, HttpContent<?> content,
236 List<Object> out) {
237 int existingMessages = out.size();
238 encodeContent(ctx, content, out);
239
240 if (HttpUtil.isContentLengthSet(newRes)) {
241
242 int messageSize = 0;
243 for (int i = existingMessages; i < out.size(); i++) {
244 Object item = out.get(i);
245 if (item instanceof HttpContent) {
246 messageSize += ((HttpContent<?>) item).payload().readableBytes();
247 }
248 }
249 HttpUtil.setContentLength(newRes, messageSize);
250 } else {
251 newRes.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
252 }
253 }
254
255 private static boolean isPassthru(HttpVersion version, int code, CharSequence httpMethod) {
256 return code < 200 || code == 204 || code == 304 ||
257 (httpMethod == ZERO_LENGTH_HEAD || (httpMethod == ZERO_LENGTH_CONNECT && code == 200)) ||
258 version == HttpVersion.HTTP_1_0;
259 }
260
261 private static void ensureHeaders(HttpObject msg) {
262 if (!(msg instanceof HttpResponse)) {
263 throw new IllegalStateException(
264 "unexpected message type: " +
265 msg.getClass().getName() + " (expected: " + HttpResponse.class.getSimpleName() + ')');
266 }
267 }
268
269 private static void ensureContent(HttpObject msg) {
270 if (!(msg instanceof HttpContent)) {
271 throw new IllegalStateException(
272 "unexpected message type: " +
273 msg.getClass().getName() + " (expected: " + HttpContent.class.getSimpleName() + ')');
274 }
275 }
276
277 private boolean encodeContent(ChannelHandlerContext ctx, HttpContent<?> c, List<Object> out) {
278 Buffer content = c.payload();
279
280 encode(content, ctx.bufferAllocator(), out);
281
282 if (c instanceof LastHttpContent) {
283 finishEncode(ctx.bufferAllocator(), out);
284 LastHttpContent<?> last = (LastHttpContent<?>) c;
285
286
287
288 HttpHeaders headers = last.trailingHeaders();
289 if (headers.isEmpty()) {
290 out.add(new EmptyLastHttpContent(ctx.bufferAllocator()));
291 } else {
292 out.add(new DefaultLastHttpContent(ctx.bufferAllocator().allocate(0), headers));
293 }
294 return true;
295 }
296 return false;
297 }
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313 protected abstract Result beginEncode(HttpResponse httpResponse, String acceptEncoding) throws Exception;
314
315 @Override
316 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
317 cleanupSafely(ctx);
318 super.handlerRemoved(ctx);
319 }
320
321 @Override
322 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
323 cleanupSafely(ctx);
324 super.channelInactive(ctx);
325 }
326
327 private void cleanup() {
328 if (compressor != null) {
329
330 try {
331 compressor.close();
332 } finally {
333 compressor = null;
334 }
335 }
336 }
337
338 private void cleanupSafely(ChannelHandlerContext ctx) {
339 try {
340 cleanup();
341 } catch (Throwable cause) {
342
343
344 ctx.fireChannelExceptionCaught(cause);
345 }
346 }
347
348 private void encode(Buffer in, BufferAllocator allocator, List<Object> out) {
349 Buffer compressed = compressor.compress(in, allocator);
350 if (compressed.readableBytes() == 0) {
351 compressed.close();
352 return;
353 }
354 out.add(new DefaultHttpContent(compressed));
355 }
356
357 private void finishEncode(BufferAllocator allocator, List<Object> out) {
358 Buffer trailer = compressor.finish(allocator);
359 if (trailer.readableBytes() == 0) {
360 trailer.close();
361 return;
362 }
363 out.add(new DefaultHttpContent(trailer));
364 compressor = null;
365 }
366
367 public static final class Result {
368 private final String targetContentEncoding;
369 private final Compressor contentCompressor;
370
371 public Result(String targetContentEncoding, Compressor contentCompressor) {
372 requireNonNull(targetContentEncoding, "targetContentEncoding");
373 requireNonNull(contentCompressor, "contentCompressor");
374
375 this.targetContentEncoding = targetContentEncoding;
376 this.contentCompressor = contentCompressor;
377 }
378
379 public String targetContentEncoding() {
380 return targetContentEncoding;
381 }
382
383 public Compressor contentCompressor() {
384 return contentCompressor;
385 }
386 }
387 }