1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.http;
17
18 import java.util.Queue;
19 import java.util.concurrent.ConcurrentLinkedQueue;
20 import java.util.concurrent.atomic.AtomicLong;
21
22 import org.jboss.netty.buffer.ChannelBuffer;
23 import org.jboss.netty.channel.Channel;
24 import org.jboss.netty.channel.ChannelDownstreamHandler;
25 import org.jboss.netty.channel.ChannelEvent;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.ChannelUpstreamHandler;
29 import org.jboss.netty.handler.codec.PrematureChannelClosureException;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public class HttpClientCodec implements ChannelUpstreamHandler,
49 ChannelDownstreamHandler {
50
51
52 final Queue<HttpMethod> queue = new ConcurrentLinkedQueue<HttpMethod>();
53
54
55 volatile boolean done;
56
57 private final HttpRequestEncoder encoder = new Encoder();
58 private final HttpResponseDecoder decoder;
59 private final AtomicLong requestResponseCounter = new AtomicLong(0);
60
61 private final boolean failOnMissingResponse;
62
63
64
65
66
67
68
69 public HttpClientCodec() {
70 this(4096, 8192, 8192, false);
71 }
72
73
74
75
76 public HttpClientCodec(
77 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
78 this(maxInitialLineLength, maxHeaderSize, maxChunkSize, false);
79 }
80
81
82
83
84 public HttpClientCodec(
85 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse) {
86 decoder = new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize);
87 this.failOnMissingResponse = failOnMissingResponse;
88 }
89
90 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
91 throws Exception {
92 decoder.handleUpstream(ctx, e);
93 }
94
95 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
96 throws Exception {
97 encoder.handleDownstream(ctx, e);
98 }
99
100 private final class Encoder extends HttpRequestEncoder {
101
102 Encoder() {
103 }
104
105 @Override
106 protected Object encode(ChannelHandlerContext ctx, Channel channel,
107 Object msg) throws Exception {
108 if (msg instanceof HttpRequest && !done) {
109 queue.offer(((HttpRequest) msg).getMethod());
110 }
111
112 Object obj = super.encode(ctx, channel, msg);
113
114 if (failOnMissingResponse) {
115
116 if (msg instanceof HttpRequest && !((HttpRequest) msg).isChunked()) {
117 requestResponseCounter.incrementAndGet();
118 } else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) {
119
120 requestResponseCounter.incrementAndGet();
121 }
122 }
123 return obj;
124 }
125 }
126
127 private final class Decoder extends HttpResponseDecoder {
128
129 Decoder(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
130 super(maxInitialLineLength, maxHeaderSize, maxChunkSize);
131 }
132
133 @Override
134 protected Object decode(ChannelHandlerContext ctx, Channel channel,
135 ChannelBuffer buffer, State state) throws Exception {
136 if (done) {
137 int readable = actualReadableBytes();
138 if (readable == 0) {
139 return null;
140 }
141 return buffer.readBytes(readable);
142 } else {
143 Object msg = super.decode(ctx, channel, buffer, state);
144 if (failOnMissingResponse) {
145 decrement(msg);
146 }
147 return msg;
148 }
149 }
150
151 private void decrement(Object msg) {
152 if (msg == null) {
153 return;
154 }
155
156
157 if (msg instanceof HttpMessage && !((HttpMessage) msg).isChunked()) {
158 requestResponseCounter.decrementAndGet();
159 } else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) {
160 requestResponseCounter.decrementAndGet();
161 } else if (msg instanceof Object[]) {
162
163
164 requestResponseCounter.decrementAndGet();
165 }
166 }
167 @Override
168 protected boolean isContentAlwaysEmpty(HttpMessage msg) {
169 final int statusCode = ((HttpResponse) msg).getStatus().getCode();
170 if (statusCode == 100) {
171
172 return true;
173 }
174
175
176
177 HttpMethod method = queue.poll();
178
179 char firstChar = method.getName().charAt(0);
180 switch (firstChar) {
181 case 'H':
182
183
184
185
186 if (HttpMethod.HEAD.equals(method)) {
187 return true;
188
189
190
191
192
193
194
195
196
197
198
199
200
201 }
202 break;
203 case 'C':
204
205 if (statusCode == 200) {
206 if (HttpMethod.CONNECT.equals(method)) {
207
208 done = true;
209 queue.clear();
210 return true;
211 }
212 }
213 break;
214 }
215
216 return super.isContentAlwaysEmpty(msg);
217 }
218
219 @Override
220 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
221 super.channelClosed(ctx, e);
222 if (failOnMissingResponse) {
223 long missingResponses = requestResponseCounter.get();
224 if (missingResponses > 0) {
225 throw new PrematureChannelClosureException(
226 "Channel closed but still missing " + missingResponses + " response(s)");
227 }
228 }
229 }
230 }
231 }