1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.http;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.channel.Channel;
20 import io.netty5.channel.ChannelHandlerContext;
21 import io.netty5.channel.ChannelPipeline;
22 import io.netty5.channel.CombinedChannelDuplexHandler;
23 import io.netty5.channel.internal.DelegatingChannelHandlerContext;
24 import io.netty5.handler.codec.PrematureChannelClosureException;
25
26 import java.util.ArrayDeque;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import static io.netty5.handler.codec.http.HttpObjectDecoder.DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS;
32 import static io.netty5.handler.codec.http.HttpObjectDecoder.DEFAULT_MAX_HEADER_SIZE;
33 import static io.netty5.handler.codec.http.HttpObjectDecoder.DEFAULT_MAX_INITIAL_LINE_LENGTH;
34 import static io.netty5.handler.codec.http.HttpObjectDecoder.DEFAULT_VALIDATE_HEADERS;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResponseDecoder, HttpRequestEncoder>
51 implements HttpClientUpgradeHandler.SourceCodec {
52 public static final boolean DEFAULT_FAIL_ON_MISSING_RESPONSE = false;
53 public static final boolean DEFAULT_PARSE_HTTP_AFTER_CONNECT_REQUEST = false;
54
55
56 private final Queue<HttpMethod> queue = new ArrayDeque<>();
57 private final boolean parseHttpAfterConnectRequest;
58
59
60 private boolean done;
61
62 private final AtomicLong requestResponseCounter = new AtomicLong();
63 private final boolean failOnMissingResponse;
64
65
66
67
68
69
70 public HttpClientCodec() {
71 this(DEFAULT_MAX_INITIAL_LINE_LENGTH, DEFAULT_MAX_HEADER_SIZE, DEFAULT_FAIL_ON_MISSING_RESPONSE);
72 }
73
74
75
76
77 public HttpClientCodec(int maxInitialLineLength, int maxHeaderSize) {
78 this(maxInitialLineLength, maxHeaderSize, DEFAULT_FAIL_ON_MISSING_RESPONSE);
79 }
80
81
82
83
84 public HttpClientCodec(
85 int maxInitialLineLength, int maxHeaderSize, boolean failOnMissingResponse) {
86 this(maxInitialLineLength, maxHeaderSize, failOnMissingResponse, DEFAULT_VALIDATE_HEADERS);
87 }
88
89
90
91
92 public HttpClientCodec(
93 int maxInitialLineLength, int maxHeaderSize, boolean failOnMissingResponse,
94 boolean validateHeaders) {
95 this(maxInitialLineLength, maxHeaderSize, failOnMissingResponse, validateHeaders,
96 DEFAULT_PARSE_HTTP_AFTER_CONNECT_REQUEST);
97 }
98
99
100
101
102 public HttpClientCodec(
103 int maxInitialLineLength, int maxHeaderSize, boolean failOnMissingResponse,
104 boolean validateHeaders, boolean parseHttpAfterConnectRequest) {
105 init(new Decoder(maxInitialLineLength, maxHeaderSize, validateHeaders), new Encoder());
106 this.failOnMissingResponse = failOnMissingResponse;
107 this.parseHttpAfterConnectRequest = parseHttpAfterConnectRequest;
108 }
109
110
111
112
113 public HttpClientCodec(
114 int maxInitialLineLength, int maxHeaderSize, boolean failOnMissingResponse,
115 boolean validateHeaders, int initialBufferSize) {
116 this(maxInitialLineLength, maxHeaderSize, failOnMissingResponse, validateHeaders,
117 initialBufferSize, DEFAULT_PARSE_HTTP_AFTER_CONNECT_REQUEST);
118 }
119
120
121
122
123 public HttpClientCodec(
124 int maxInitialLineLength, int maxHeaderSize, boolean failOnMissingResponse,
125 boolean validateHeaders, int initialBufferSize, boolean parseHttpAfterConnectRequest) {
126 this(maxInitialLineLength, maxHeaderSize, failOnMissingResponse, validateHeaders,
127 initialBufferSize, parseHttpAfterConnectRequest, DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS);
128 }
129
130
131
132
133 public HttpClientCodec(
134 int maxInitialLineLength, int maxHeaderSize, boolean failOnMissingResponse,
135 boolean validateHeaders, int initialBufferSize, boolean parseHttpAfterConnectRequest,
136 boolean allowDuplicateContentLengths) {
137 init(new Decoder(maxInitialLineLength, maxHeaderSize, validateHeaders, initialBufferSize,
138 allowDuplicateContentLengths),
139 new Encoder());
140 this.parseHttpAfterConnectRequest = parseHttpAfterConnectRequest;
141 this.failOnMissingResponse = failOnMissingResponse;
142 }
143
144
145
146
147 @Override
148 public void prepareUpgradeFrom(ChannelHandlerContext ctx) {
149 ((Encoder) outboundHandler()).upgraded = true;
150 }
151
152
153
154
155
156 @Override
157 public void upgradeFrom(ChannelHandlerContext ctx) {
158 final ChannelPipeline p = ctx.pipeline();
159 p.remove(this);
160 }
161
162 public void setSingleDecode(boolean singleDecode) {
163 inboundHandler().setSingleDecode(singleDecode);
164 }
165
166 public boolean isSingleDecode() {
167 return inboundHandler().isSingleDecode();
168 }
169
170 private final class Encoder extends HttpRequestEncoder {
171
172 boolean upgraded;
173
174 @Override
175 protected void encodeAndClose(
176 ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
177
178 if (upgraded) {
179 out.add(msg);
180 return;
181 }
182
183 if (msg instanceof HttpRequest) {
184 queue.offer(((HttpRequest) msg).method());
185 }
186
187 super.encodeAndClose(ctx, msg, out);
188
189 if (failOnMissingResponse && !done) {
190
191 if (msg instanceof LastHttpContent) {
192
193 requestResponseCounter.incrementAndGet();
194 }
195 }
196 }
197 }
198
199 private final class Decoder extends HttpResponseDecoder {
200
201 private ChannelHandlerContext context;
202
203 Decoder(int maxInitialLineLength, int maxHeaderSize, boolean validateHeaders) {
204 super(maxInitialLineLength, maxHeaderSize, validateHeaders);
205 }
206
207 Decoder(int maxInitialLineLength, int maxHeaderSize, boolean validateHeaders,
208 int initialBufferSize, boolean allowDuplicateContentLengths) {
209 super(maxInitialLineLength, maxHeaderSize, validateHeaders, initialBufferSize,
210 allowDuplicateContentLengths);
211 }
212
213 @Override
214 protected void handlerAdded0(ChannelHandlerContext ctx) {
215 if (failOnMissingResponse) {
216 context = new DelegatingChannelHandlerContext(ctx) {
217 @Override
218 public ChannelHandlerContext fireChannelRead(Object msg) {
219 decrement(msg);
220
221 super.fireChannelRead(msg);
222 return this;
223 }
224 };
225 } else {
226 context = ctx;
227 }
228 }
229
230 @Override
231 protected void decode(ChannelHandlerContext ctx, Buffer buffer) throws Exception {
232 if (done) {
233 int readable = actualReadableBytes();
234 if (readable == 0) {
235
236
237 return;
238 }
239 ctx.fireChannelRead(buffer.readSplit(readable));
240 } else {
241 super.decode(context, buffer);
242 }
243 }
244
245 private void decrement(Object msg) {
246 if (msg == null) {
247 return;
248 }
249
250
251 if (msg instanceof LastHttpContent) {
252 requestResponseCounter.decrementAndGet();
253 }
254 }
255
256 @Override
257 protected boolean isContentAlwaysEmpty(HttpMessage msg) {
258
259
260
261
262
263 HttpMethod method = queue.poll();
264
265 final int statusCode = ((HttpResponse) msg).status().code();
266 if (statusCode >= 100 && statusCode < 200) {
267
268
269 return super.isContentAlwaysEmpty(msg);
270 }
271
272
273
274 if (method != null) {
275 char firstChar = method.name().charAt(0);
276 switch (firstChar) {
277 case 'H':
278
279
280
281
282 if (HttpMethod.HEAD.equals(method)) {
283 return true;
284
285
286
287
288
289
290
291
292
293
294
295
296
297 }
298 break;
299 case 'C':
300
301 if (statusCode == 200) {
302 if (HttpMethod.CONNECT.equals(method)) {
303
304
305 if (!parseHttpAfterConnectRequest) {
306 done = true;
307 queue.clear();
308 }
309 return true;
310 }
311 }
312 break;
313 default:
314 break;
315 }
316 }
317 return super.isContentAlwaysEmpty(msg);
318 }
319
320 @Override
321 public void channelInactive(ChannelHandlerContext ctx)
322 throws Exception {
323 super.channelInactive(ctx);
324
325 if (failOnMissingResponse) {
326 long missingResponses = requestResponseCounter.get();
327 if (missingResponses > 0) {
328 ctx.fireChannelExceptionCaught(new PrematureChannelClosureException(
329 "channel gone inactive with " + missingResponses +
330 " missing response(s)"));
331 }
332 }
333 }
334 }
335 }