1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.ChannelInboundHandlerAdapter;
21 import io.netty.channel.embedded.EmbeddedChannel;
22 import io.netty.handler.codec.CodecException;
23 import io.netty.handler.codec.DecoderResult;
24 import io.netty.handler.codec.MessageToMessageDecoder;
25 import io.netty.util.ReferenceCountUtil;
26
27 import java.util.List;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObject> {
49
50 static final String IDENTITY = HttpHeaderValues.IDENTITY.toString();
51
52 protected ChannelHandlerContext ctx;
53 private EmbeddedChannel decoder;
54 private boolean continueResponse;
55 private boolean needRead = true;
56 private ByteBufForwarder forwarder;
57
58 @Override
59 protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
60 needRead = true;
61 if (msg instanceof HttpResponse && ((HttpResponse) msg).status().code() == 100) {
62
63 if (!(msg instanceof LastHttpContent)) {
64 continueResponse = true;
65 }
66
67 needRead = false;
68 ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
69 return;
70 }
71
72 if (continueResponse) {
73 if (msg instanceof LastHttpContent) {
74 continueResponse = false;
75 }
76 needRead = false;
77 ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
78 return;
79 }
80
81 if (msg instanceof HttpMessage) {
82 cleanup();
83 final HttpMessage message = (HttpMessage) msg;
84 final HttpHeaders headers = message.headers();
85
86
87 String contentEncoding = headers.get(HttpHeaderNames.CONTENT_ENCODING);
88 if (contentEncoding != null) {
89 contentEncoding = contentEncoding.trim();
90 } else {
91 String transferEncoding = headers.get(HttpHeaderNames.TRANSFER_ENCODING);
92 if (transferEncoding != null) {
93 int idx = transferEncoding.indexOf(",");
94 if (idx != -1) {
95 contentEncoding = transferEncoding.substring(0, idx).trim();
96 } else {
97 contentEncoding = transferEncoding.trim();
98 }
99 } else {
100 contentEncoding = IDENTITY;
101 }
102 }
103 decoder = newContentDecoder(contentEncoding);
104
105 if (decoder == null) {
106 if (message instanceof HttpContent) {
107 ((HttpContent) message).retain();
108 }
109 needRead = false;
110 ctx.fireChannelRead(message);
111 return;
112 }
113 decoder.pipeline().addLast(forwarder);
114
115
116
117
118
119 if (headers.contains(HttpHeaderNames.CONTENT_LENGTH)) {
120 headers.remove(HttpHeaderNames.CONTENT_LENGTH);
121 headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
122 }
123
124
125
126
127 CharSequence targetContentEncoding = getTargetContentEncoding(contentEncoding);
128 if (HttpHeaderValues.IDENTITY.contentEquals(targetContentEncoding)) {
129
130
131 headers.remove(HttpHeaderNames.CONTENT_ENCODING);
132 } else {
133 headers.set(HttpHeaderNames.CONTENT_ENCODING, targetContentEncoding);
134 }
135
136 if (message instanceof HttpContent) {
137
138
139
140
141 HttpMessage copy;
142 if (message instanceof HttpRequest) {
143 HttpRequest r = (HttpRequest) message;
144 copy = new DefaultHttpRequest(r.protocolVersion(), r.method(), r.uri());
145 } else if (message instanceof HttpResponse) {
146 HttpResponse r = (HttpResponse) message;
147 copy = new DefaultHttpResponse(r.protocolVersion(), r.status());
148 } else {
149 throw new CodecException("Object of class " + message.getClass().getName() +
150 " is not an HttpRequest or HttpResponse");
151 }
152 copy.headers().set(message.headers());
153 copy.setDecoderResult(message.decoderResult());
154 needRead = false;
155 ctx.fireChannelRead(copy);
156 } else {
157 needRead = false;
158 ctx.fireChannelRead(message);
159 }
160 }
161
162 if (msg instanceof HttpContent) {
163 final HttpContent c = (HttpContent) msg;
164 if (decoder == null) {
165 needRead = false;
166 ctx.fireChannelRead(c.retain());
167 } else {
168
169 decoder.writeInbound(c.content().retain());
170
171 if (c instanceof LastHttpContent) {
172 boolean notEmpty = decoder.finish();
173 decoder = null;
174 assert !notEmpty;
175 LastHttpContent last = (LastHttpContent) c;
176
177
178 HttpHeaders headers = last.trailingHeaders();
179 needRead = false;
180 if (headers.isEmpty()) {
181 ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
182 } else {
183 ctx.fireChannelRead(new ComposedLastHttpContent(headers, DecoderResult.SUCCESS));
184 }
185 }
186 }
187 }
188 }
189
190 @Override
191 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
192 boolean needRead = this.needRead;
193 this.needRead = true;
194
195 try {
196 ctx.fireChannelReadComplete();
197 } finally {
198 if (needRead && !ctx.channel().config().isAutoRead()) {
199 ctx.read();
200 }
201 }
202 }
203
204
205
206
207
208
209
210
211
212
213 protected abstract EmbeddedChannel newContentDecoder(String contentEncoding) throws Exception;
214
215
216
217
218
219
220
221
222
223 protected String getTargetContentEncoding(
224 @SuppressWarnings("UnusedParameters") String contentEncoding) throws Exception {
225 return IDENTITY;
226 }
227
228 @Override
229 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
230 cleanupSafely(ctx);
231 super.handlerRemoved(ctx);
232 }
233
234 @Override
235 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
236 cleanupSafely(ctx);
237 super.channelInactive(ctx);
238 }
239
240 @Override
241 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
242 this.ctx = ctx;
243 forwarder = new ByteBufForwarder(ctx);
244 super.handlerAdded(ctx);
245 }
246
247 private void cleanup() {
248 if (decoder != null) {
249
250 decoder.finishAndReleaseAll();
251 decoder = null;
252 }
253 }
254
255 private void cleanupSafely(ChannelHandlerContext ctx) {
256 try {
257 cleanup();
258 } catch (Throwable cause) {
259
260
261 ctx.fireExceptionCaught(cause);
262 }
263 }
264
265 private final class ByteBufForwarder extends ChannelInboundHandlerAdapter {
266
267 private final ChannelHandlerContext targetCtx;
268
269 ByteBufForwarder(ChannelHandlerContext targetCtx) {
270 this.targetCtx = targetCtx;
271 }
272
273 @Override
274 public boolean isSharable() {
275
276
277 return true;
278 }
279
280 @Override
281 public void channelRead(ChannelHandlerContext ctx, Object msg) {
282 ByteBuf buf = (ByteBuf) msg;
283 if (!buf.isReadable()) {
284 buf.release();
285 return;
286 }
287 needRead = false;
288 targetCtx.fireChannelRead(new DefaultHttpContent(buf));
289 }
290 }
291 }