1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.CombinedChannelDuplexHandler;
22 import io.netty.handler.codec.PrematureChannelClosureException;
23
24 import java.util.ArrayDeque;
25 import java.util.List;
26 import java.util.Queue;
27 import java.util.concurrent.atomic.AtomicLong;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 public final class HttpClientCodec
44 extends CombinedChannelDuplexHandler<HttpResponseDecoder, HttpRequestEncoder> {
45
46
47 private final Queue<HttpMethod> queue = new ArrayDeque<HttpMethod>();
48
49
50 private boolean done;
51
52 private final AtomicLong requestResponseCounter = new AtomicLong();
53 private final boolean failOnMissingResponse;
54
55
56
57
58
59
60 public HttpClientCodec() {
61 this(4096, 8192, 8192, false);
62 }
63
64 public void setSingleDecode(boolean singleDecode) {
65 inboundHandler().setSingleDecode(singleDecode);
66 }
67
68 public boolean isSingleDecode() {
69 return inboundHandler().isSingleDecode();
70 }
71
72
73
74
75 public HttpClientCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
76 this(maxInitialLineLength, maxHeaderSize, maxChunkSize, false);
77 }
78
79
80
81
82 public HttpClientCodec(
83 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse) {
84 this(maxInitialLineLength, maxHeaderSize, maxChunkSize, failOnMissingResponse, true);
85 }
86
87
88
89
90 public HttpClientCodec(
91 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
92 boolean validateHeaders) {
93 init(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders), new Encoder());
94 this.failOnMissingResponse = failOnMissingResponse;
95 }
96
97
98
99
100 public HttpClientCodec(
101 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
102 boolean validateHeaders, int initialBufferSize) {
103 init(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders, initialBufferSize),
104 new Encoder());
105 this.failOnMissingResponse = failOnMissingResponse;
106 }
107
108 private final class Encoder extends HttpRequestEncoder {
109
110 @Override
111 protected void encode(
112 ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
113 if (msg instanceof HttpRequest && !done) {
114 queue.offer(((HttpRequest) msg).getMethod());
115 }
116
117 super.encode(ctx, msg, out);
118
119 if (failOnMissingResponse) {
120
121 if (msg instanceof LastHttpContent) {
122
123 requestResponseCounter.incrementAndGet();
124 }
125 }
126 }
127 }
128
129 private final class Decoder extends HttpResponseDecoder {
130 Decoder(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean validateHeaders) {
131 super(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders);
132 }
133
134 Decoder(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean validateHeaders,
135 int initialBufferSize) {
136 super(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders, initialBufferSize);
137 }
138
139 @Override
140 protected void decode(
141 ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
142 if (done) {
143 int readable = actualReadableBytes();
144 if (readable == 0) {
145
146
147 return;
148 }
149 out.add(buffer.readBytes(readable));
150 } else {
151 int oldSize = out.size();
152 super.decode(ctx, buffer, out);
153 if (failOnMissingResponse) {
154 int size = out.size();
155 for (int i = oldSize; i < size; i++) {
156 decrement(out.get(i));
157 }
158 }
159 }
160 }
161
162 private void decrement(Object msg) {
163 if (msg == null) {
164 return;
165 }
166
167
168 if (msg instanceof LastHttpContent) {
169 requestResponseCounter.decrementAndGet();
170 }
171 }
172
173 @Override
174 protected boolean isContentAlwaysEmpty(HttpMessage msg) {
175 final int statusCode = ((HttpResponse) msg).getStatus().code();
176 if (statusCode == 100 || statusCode == 101) {
177
178
179 return super.isContentAlwaysEmpty(msg);
180 }
181
182
183
184 HttpMethod method = queue.poll();
185
186 char firstChar = method.name().charAt(0);
187 switch (firstChar) {
188 case 'H':
189
190
191
192
193 if (HttpMethod.HEAD.equals(method)) {
194 return true;
195
196
197
198
199
200
201
202
203
204
205
206
207
208 }
209 break;
210 case 'C':
211
212 if (statusCode == 200) {
213 if (HttpMethod.CONNECT.equals(method)) {
214
215 done = true;
216 queue.clear();
217 return true;
218 }
219 }
220 break;
221 }
222
223 return super.isContentAlwaysEmpty(msg);
224 }
225
226 @Override
227 public void channelInactive(ChannelHandlerContext ctx)
228 throws Exception {
229 super.channelInactive(ctx);
230
231 if (failOnMissingResponse) {
232 long missingResponses = requestResponseCounter.get();
233 if (missingResponses > 0) {
234 ctx.fireExceptionCaught(new PrematureChannelClosureException(
235 "channel gone inactive with " + missingResponses +
236 " missing response(s)"));
237 }
238 }
239 }
240 }
241 }