1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.ChannelInboundHandlerAdapter;
21 import io.netty.channel.embedded.EmbeddedChannel;
22 import io.netty.handler.codec.CodecException;
23 import io.netty.handler.codec.DecoderResult;
24 import io.netty.handler.codec.MessageToMessageDecoder;
25 import io.netty.util.ReferenceCountUtil;
26
27 import java.util.List;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObject> {
49
50 static final String IDENTITY = HttpHeaderValues.IDENTITY.toString();
51
52 protected ChannelHandlerContext ctx;
53 private EmbeddedChannel decoder;
54 private boolean continueResponse;
55 private boolean needRead = true;
56 private ByteBufForwarder forwarder;
57
58 public HttpContentDecoder() {
59 super(HttpObject.class);
60 }
61
62 @Override
63 protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
64 needRead = true;
65 if (msg instanceof HttpResponse && ((HttpResponse) msg).status().code() == 100) {
66
67 if (!(msg instanceof LastHttpContent)) {
68 continueResponse = true;
69 }
70
71 needRead = false;
72 ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
73 return;
74 }
75
76 if (continueResponse) {
77 if (msg instanceof LastHttpContent) {
78 continueResponse = false;
79 }
80
81 needRead = false;
82 ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
83 return;
84 }
85
86 if (msg instanceof HttpMessage) {
87 cleanup();
88 final HttpMessage message = (HttpMessage) msg;
89 final HttpHeaders headers = message.headers();
90
91
92 String contentEncoding = headers.get(HttpHeaderNames.CONTENT_ENCODING);
93 if (contentEncoding != null) {
94 contentEncoding = contentEncoding.trim();
95 } else {
96 String transferEncoding = headers.get(HttpHeaderNames.TRANSFER_ENCODING);
97 if (transferEncoding != null) {
98 int idx = transferEncoding.indexOf(',');
99 if (idx != -1) {
100 contentEncoding = transferEncoding.substring(0, idx).trim();
101 } else {
102 contentEncoding = transferEncoding.trim();
103 }
104 } else {
105 contentEncoding = IDENTITY;
106 }
107 }
108 decoder = newContentDecoder(contentEncoding);
109
110 if (decoder == null) {
111 if (message instanceof HttpContent) {
112 ((HttpContent) message).retain();
113 }
114 needRead = false;
115 ctx.fireChannelRead(message);
116 return;
117 }
118 decoder.pipeline().addLast(forwarder);
119
120
121
122
123 if (headers.contains(HttpHeaderNames.CONTENT_LENGTH)) {
124 headers.remove(HttpHeaderNames.CONTENT_LENGTH);
125 headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
126 }
127
128
129
130
131 CharSequence targetContentEncoding = getTargetContentEncoding(contentEncoding);
132 if (HttpHeaderValues.IDENTITY.contentEquals(targetContentEncoding)) {
133
134
135 headers.remove(HttpHeaderNames.CONTENT_ENCODING);
136 } else {
137 headers.set(HttpHeaderNames.CONTENT_ENCODING, targetContentEncoding);
138 }
139
140 if (message instanceof HttpContent) {
141
142
143
144
145 HttpMessage copy;
146 if (message instanceof HttpRequest) {
147 HttpRequest r = (HttpRequest) message;
148 copy = new DefaultHttpRequest(r.protocolVersion(), r.method(), r.uri());
149 } else if (message instanceof HttpResponse) {
150 HttpResponse r = (HttpResponse) message;
151 copy = new DefaultHttpResponse(r.protocolVersion(), r.status());
152 } else {
153 throw new CodecException("Object of class " + message.getClass().getName() +
154 " is not an HttpRequest or HttpResponse");
155 }
156 copy.headers().set(message.headers());
157 copy.setDecoderResult(message.decoderResult());
158 needRead = false;
159 ctx.fireChannelRead(copy);
160 } else {
161 needRead = false;
162 ctx.fireChannelRead(message);
163 }
164 }
165
166 if (msg instanceof HttpContent) {
167 final HttpContent c = (HttpContent) msg;
168 if (decoder == null) {
169 needRead = false;
170 ctx.fireChannelRead(c.retain());
171 } else {
172
173 decoder.writeInbound(c.content().retain());
174
175 if (c instanceof LastHttpContent) {
176 boolean notEmpty = decoder.finish();
177 decoder = null;
178 assert !notEmpty;
179 LastHttpContent last = (LastHttpContent) c;
180
181
182 HttpHeaders headers = last.trailingHeaders();
183 needRead = false;
184 if (headers.isEmpty()) {
185 ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
186 } else {
187 ctx.fireChannelRead(new ComposedLastHttpContent(headers, DecoderResult.SUCCESS));
188 }
189 }
190 }
191 }
192 }
193
194 @Override
195 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
196 boolean needRead = this.needRead;
197 this.needRead = true;
198
199 try {
200 ctx.fireChannelReadComplete();
201 } finally {
202 if (needRead && !ctx.channel().config().isAutoRead()) {
203 ctx.read();
204 }
205 }
206 }
207
208
209
210
211
212
213
214
215
216
217 protected abstract EmbeddedChannel newContentDecoder(String contentEncoding) throws Exception;
218
219
220
221
222
223
224
225
226
227 protected String getTargetContentEncoding(
228 @SuppressWarnings("UnusedParameters") String contentEncoding) throws Exception {
229 return IDENTITY;
230 }
231
232 @Override
233 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
234 cleanupSafely(ctx);
235 super.handlerRemoved(ctx);
236 }
237
238 @Override
239 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
240 cleanupSafely(ctx);
241 super.channelInactive(ctx);
242 }
243
244 @Override
245 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
246 this.ctx = ctx;
247 forwarder = new ByteBufForwarder(ctx);
248 super.handlerAdded(ctx);
249 }
250
251 private void cleanup() {
252 if (decoder != null) {
253
254 boolean nonEmpty = decoder.finishAndReleaseAll();
255 decoder = null;
256 assert !nonEmpty;
257 }
258 }
259
260 private void cleanupSafely(ChannelHandlerContext ctx) {
261 try {
262 cleanup();
263 } catch (Throwable cause) {
264
265
266 ctx.fireExceptionCaught(cause);
267 }
268 }
269
270 private final class ByteBufForwarder extends ChannelInboundHandlerAdapter {
271
272 private final ChannelHandlerContext targetCtx;
273
274 ByteBufForwarder(ChannelHandlerContext targetCtx) {
275 this.targetCtx = targetCtx;
276 }
277
278 @Override
279 public boolean isSharable() {
280
281
282 return true;
283 }
284
285 @Override
286 public void channelRead(ChannelHandlerContext ctx, Object msg) {
287 ByteBuf buf = (ByteBuf) msg;
288 if (!buf.isReadable()) {
289 buf.release();
290 return;
291 }
292 needRead = false;
293 targetCtx.fireChannelRead(new DefaultHttpContent(buf));
294 }
295 }
296 }