1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.CombinedChannelDuplexHandler;
23 import io.netty.handler.codec.PrematureChannelClosureException;
24 import io.netty.util.ReferenceCountUtil;
25
26 import java.util.ArrayDeque;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import static io.netty.handler.codec.http.HttpObjectDecoder.DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS;
32 import static io.netty.handler.codec.http.HttpObjectDecoder.DEFAULT_ALLOW_PARTIAL_CHUNKS;
33 import static io.netty.handler.codec.http.HttpObjectDecoder.DEFAULT_MAX_CHUNK_SIZE;
34 import static io.netty.handler.codec.http.HttpObjectDecoder.DEFAULT_MAX_HEADER_SIZE;
35 import static io.netty.handler.codec.http.HttpObjectDecoder.DEFAULT_MAX_INITIAL_LINE_LENGTH;
36 import static io.netty.handler.codec.http.HttpObjectDecoder.DEFAULT_VALIDATE_HEADERS;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResponseDecoder, HttpRequestEncoder>
53 implements HttpClientUpgradeHandler.SourceCodec {
54 public static final boolean DEFAULT_FAIL_ON_MISSING_RESPONSE = false;
55 public static final boolean DEFAULT_PARSE_HTTP_AFTER_CONNECT_REQUEST = false;
56
57
58 private final Queue<HttpMethod> queue = new ArrayDeque<HttpMethod>();
59 private final boolean parseHttpAfterConnectRequest;
60
61
62 private boolean done;
63
64 private final AtomicLong requestResponseCounter = new AtomicLong();
65 private final boolean failOnMissingResponse;
66
67
68
69
70
71
72 public HttpClientCodec() {
73 this(DEFAULT_MAX_INITIAL_LINE_LENGTH, DEFAULT_MAX_HEADER_SIZE, DEFAULT_MAX_CHUNK_SIZE,
74 DEFAULT_FAIL_ON_MISSING_RESPONSE);
75 }
76
77
78
79
80 public HttpClientCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
81 this(maxInitialLineLength, maxHeaderSize, maxChunkSize, DEFAULT_FAIL_ON_MISSING_RESPONSE);
82 }
83
84
85
86
87 public HttpClientCodec(
88 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse) {
89 this(maxInitialLineLength, maxHeaderSize, maxChunkSize, failOnMissingResponse, DEFAULT_VALIDATE_HEADERS);
90 }
91
92
93
94
95 public HttpClientCodec(
96 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
97 boolean validateHeaders) {
98 this(maxInitialLineLength, maxHeaderSize, maxChunkSize, failOnMissingResponse, validateHeaders,
99 DEFAULT_PARSE_HTTP_AFTER_CONNECT_REQUEST);
100 }
101
102
103
104
105 public HttpClientCodec(
106 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
107 boolean validateHeaders, boolean parseHttpAfterConnectRequest) {
108 init(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders), new Encoder());
109 this.failOnMissingResponse = failOnMissingResponse;
110 this.parseHttpAfterConnectRequest = parseHttpAfterConnectRequest;
111 }
112
113
114
115
116 public HttpClientCodec(
117 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
118 boolean validateHeaders, int initialBufferSize) {
119 this(maxInitialLineLength, maxHeaderSize, maxChunkSize, failOnMissingResponse, validateHeaders,
120 initialBufferSize, DEFAULT_PARSE_HTTP_AFTER_CONNECT_REQUEST);
121 }
122
123
124
125
126 public HttpClientCodec(
127 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
128 boolean validateHeaders, int initialBufferSize, boolean parseHttpAfterConnectRequest) {
129 this(maxInitialLineLength, maxHeaderSize, maxChunkSize, failOnMissingResponse, validateHeaders,
130 initialBufferSize, parseHttpAfterConnectRequest, DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS);
131 }
132
133
134
135
136 public HttpClientCodec(
137 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
138 boolean validateHeaders, int initialBufferSize, boolean parseHttpAfterConnectRequest,
139 boolean allowDuplicateContentLengths) {
140 this(maxInitialLineLength, maxHeaderSize, maxChunkSize, failOnMissingResponse, validateHeaders,
141 initialBufferSize, parseHttpAfterConnectRequest, allowDuplicateContentLengths,
142 DEFAULT_ALLOW_PARTIAL_CHUNKS);
143 }
144
145
146
147
148 public HttpClientCodec(
149 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
150 boolean validateHeaders, int initialBufferSize, boolean parseHttpAfterConnectRequest,
151 boolean allowDuplicateContentLengths, boolean allowPartialChunks) {
152 init(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders, initialBufferSize,
153 allowDuplicateContentLengths, allowPartialChunks),
154 new Encoder());
155 this.parseHttpAfterConnectRequest = parseHttpAfterConnectRequest;
156 this.failOnMissingResponse = failOnMissingResponse;
157 }
158
159
160
161
162 @Override
163 public void prepareUpgradeFrom(ChannelHandlerContext ctx) {
164 ((Encoder) outboundHandler()).upgraded = true;
165 }
166
167
168
169
170
171 @Override
172 public void upgradeFrom(ChannelHandlerContext ctx) {
173 final ChannelPipeline p = ctx.pipeline();
174 p.remove(this);
175 }
176
177 public void setSingleDecode(boolean singleDecode) {
178 inboundHandler().setSingleDecode(singleDecode);
179 }
180
181 public boolean isSingleDecode() {
182 return inboundHandler().isSingleDecode();
183 }
184
185 private final class Encoder extends HttpRequestEncoder {
186
187 boolean upgraded;
188
189 @Override
190 protected void encode(
191 ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
192
193 if (upgraded) {
194 out.add(ReferenceCountUtil.retain(msg));
195 return;
196 }
197
198 if (msg instanceof HttpRequest) {
199 queue.offer(((HttpRequest) msg).method());
200 }
201
202 super.encode(ctx, msg, out);
203
204 if (failOnMissingResponse && !done) {
205
206 if (msg instanceof LastHttpContent) {
207
208 requestResponseCounter.incrementAndGet();
209 }
210 }
211 }
212 }
213
214 private final class Decoder extends HttpResponseDecoder {
215 Decoder(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean validateHeaders) {
216 super(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders);
217 }
218
219 Decoder(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean validateHeaders,
220 int initialBufferSize, boolean allowDuplicateContentLengths, boolean allowPartialChunks) {
221 super(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders, initialBufferSize,
222 allowDuplicateContentLengths, allowPartialChunks);
223 }
224
225 @Override
226 protected void decode(
227 ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
228 if (done) {
229 int readable = actualReadableBytes();
230 if (readable == 0) {
231
232
233 return;
234 }
235 out.add(buffer.readBytes(readable));
236 } else {
237 int oldSize = out.size();
238 super.decode(ctx, buffer, out);
239 if (failOnMissingResponse) {
240 int size = out.size();
241 for (int i = oldSize; i < size; i++) {
242 decrement(out.get(i));
243 }
244 }
245 }
246 }
247
248 private void decrement(Object msg) {
249 if (msg == null) {
250 return;
251 }
252
253
254 if (msg instanceof LastHttpContent) {
255 requestResponseCounter.decrementAndGet();
256 }
257 }
258
259 @Override
260 protected boolean isContentAlwaysEmpty(HttpMessage msg) {
261
262
263
264
265
266 HttpMethod method = queue.poll();
267
268 final int statusCode = ((HttpResponse) msg).status().code();
269 if (statusCode >= 100 && statusCode < 200) {
270
271
272 return super.isContentAlwaysEmpty(msg);
273 }
274
275
276
277 if (method != null) {
278 char firstChar = method.name().charAt(0);
279 switch (firstChar) {
280 case 'H':
281
282
283
284
285 if (HttpMethod.HEAD.equals(method)) {
286 return true;
287
288
289
290
291
292
293
294
295
296
297
298
299
300 }
301 break;
302 case 'C':
303
304 if (statusCode == 200) {
305 if (HttpMethod.CONNECT.equals(method)) {
306
307
308 if (!parseHttpAfterConnectRequest) {
309 done = true;
310 queue.clear();
311 }
312 return true;
313 }
314 }
315 break;
316 default:
317 break;
318 }
319 }
320 return super.isContentAlwaysEmpty(msg);
321 }
322
323 @Override
324 public void channelInactive(ChannelHandlerContext ctx)
325 throws Exception {
326 super.channelInactive(ctx);
327
328 if (failOnMissingResponse) {
329 long missingResponses = requestResponseCounter.get();
330 if (missingResponses > 0) {
331 ctx.fireExceptionCaught(new PrematureChannelClosureException(
332 "channel gone inactive with " + missingResponses +
333 " missing response(s)"));
334 }
335 }
336 }
337 }
338 }