1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.http;
17
18 import java.util.Queue;
19 import java.util.concurrent.ConcurrentLinkedQueue;
20 import java.util.concurrent.atomic.AtomicLong;
21
22 import org.jboss.netty.buffer.ChannelBuffer;
23 import org.jboss.netty.channel.Channel;
24 import org.jboss.netty.channel.ChannelDownstreamHandler;
25 import org.jboss.netty.channel.ChannelEvent;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.ChannelUpstreamHandler;
29 import org.jboss.netty.handler.codec.PrematureChannelClosureException;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public class HttpClientCodec implements ChannelUpstreamHandler,
49 ChannelDownstreamHandler {
50
51
52 final Queue<HttpMethod> queue = new ConcurrentLinkedQueue<HttpMethod>();
53
54
55 volatile boolean done;
56
57 private final HttpRequestEncoder encoder = new Encoder();
58 private final HttpResponseDecoder decoder;
59 private final AtomicLong requestResponseCounter = new AtomicLong(0);
60
61 private final boolean failOnMissingResponse;
62
63
64
65
66
67
68
69 public HttpClientCodec() {
70 this(4096, 8192, 8192, false);
71 }
72
73
74
75
76 public HttpClientCodec(
77 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
78 this(maxInitialLineLength, maxHeaderSize, maxChunkSize, false);
79 }
80
81
82
83
84 public HttpClientCodec(
85 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse) {
86 decoder = new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize);
87 this.failOnMissingResponse = failOnMissingResponse;
88 }
89
90 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
91 throws Exception {
92 decoder.handleUpstream(ctx, e);
93 }
94
95 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
96 throws Exception {
97 encoder.handleDownstream(ctx, e);
98 }
99
100 private final class Encoder extends HttpRequestEncoder {
101
102 Encoder() {
103 }
104
105 @Override
106 protected Object encode(ChannelHandlerContext ctx, Channel channel,
107 Object msg) throws Exception {
108 if (msg instanceof HttpRequest && !done) {
109 queue.offer(((HttpRequest) msg).getMethod());
110 }
111
112 Object obj = super.encode(ctx, channel, msg);
113
114 if (failOnMissingResponse) {
115
116 if (msg instanceof HttpRequest && !((HttpRequest) msg).isChunked()) {
117 requestResponseCounter.incrementAndGet();
118 } else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) {
119
120 requestResponseCounter.incrementAndGet();
121 }
122 }
123 return obj;
124 }
125 }
126
127 private final class Decoder extends HttpResponseDecoder {
128
129 Decoder(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
130 super(maxInitialLineLength, maxHeaderSize, maxChunkSize);
131 }
132
133 @Override
134 protected Object decode(ChannelHandlerContext ctx, Channel channel,
135 ChannelBuffer buffer, State state) throws Exception {
136 if (done) {
137 return buffer.readBytes(actualReadableBytes());
138 } else {
139 Object msg = super.decode(ctx, channel, buffer, state);
140 if (failOnMissingResponse) {
141 decrement(msg);
142 }
143 return msg;
144 }
145 }
146
147 private void decrement(Object msg) {
148 if (msg == null) {
149 return;
150 }
151
152
153 if (msg instanceof HttpMessage && !((HttpMessage) msg).isChunked()) {
154 requestResponseCounter.decrementAndGet();
155 } else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) {
156 requestResponseCounter.decrementAndGet();
157 } else if (msg instanceof Object[]) {
158
159
160 requestResponseCounter.decrementAndGet();
161 }
162 }
163 @Override
164 protected boolean isContentAlwaysEmpty(HttpMessage msg) {
165 final int statusCode = ((HttpResponse) msg).getStatus().getCode();
166 if (statusCode == 100) {
167
168 return true;
169 }
170
171
172
173 HttpMethod method = queue.poll();
174
175 char firstChar = method.getName().charAt(0);
176 switch (firstChar) {
177 case 'H':
178
179
180
181
182 if (HttpMethod.HEAD.equals(method)) {
183 return true;
184
185
186
187
188
189
190
191
192
193
194
195
196
197 }
198 break;
199 case 'C':
200
201 if (statusCode == 200) {
202 if (HttpMethod.CONNECT.equals(method)) {
203
204 done = true;
205 queue.clear();
206 return true;
207 }
208 }
209 break;
210 }
211
212 return super.isContentAlwaysEmpty(msg);
213 }
214
215 @Override
216 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
217 super.channelClosed(ctx, e);
218 if (failOnMissingResponse) {
219 long missingResponses = requestResponseCounter.get();
220 if (missingResponses > 0) {
221 throw new PrematureChannelClosureException(
222 "Channel closed but still missing " + missingResponses + " response(s)");
223 }
224 }
225 }
226 }
227 }