1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.http;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.net.InetSocketAddress;
21 import java.net.SocketAddress;
22 import java.nio.channels.NotYetConnectedException;
23
24 import javax.net.ssl.SSLContext;
25 import javax.net.ssl.SSLEngine;
26
27 import org.jboss.netty.buffer.ChannelBuffer;
28 import org.jboss.netty.buffer.ChannelBuffers;
29 import org.jboss.netty.channel.AbstractChannel;
30 import org.jboss.netty.channel.ChannelException;
31 import org.jboss.netty.channel.ChannelFactory;
32 import org.jboss.netty.channel.ChannelFuture;
33 import org.jboss.netty.channel.ChannelFutureListener;
34 import org.jboss.netty.channel.ChannelHandlerContext;
35 import org.jboss.netty.channel.ChannelPipeline;
36 import org.jboss.netty.channel.ChannelSink;
37 import org.jboss.netty.channel.ChannelStateEvent;
38 import org.jboss.netty.channel.DefaultChannelPipeline;
39 import org.jboss.netty.channel.ExceptionEvent;
40 import org.jboss.netty.channel.MessageEvent;
41 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
42 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
43 import org.jboss.netty.channel.socket.SocketChannel;
44 import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
45 import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
46 import org.jboss.netty.handler.codec.http.HttpChunk;
47 import org.jboss.netty.handler.codec.http.HttpHeaders;
48 import org.jboss.netty.handler.codec.http.HttpMethod;
49 import org.jboss.netty.handler.codec.http.HttpRequest;
50 import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
51 import org.jboss.netty.handler.codec.http.HttpResponse;
52 import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
53 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
54 import org.jboss.netty.handler.codec.http.HttpVersion;
55 import org.jboss.netty.handler.ssl.SslHandler;
56
57
58
59 class HttpTunnelingClientSocketChannel extends AbstractChannel
60 implements SocketChannel {
61
62 final HttpTunnelingSocketChannelConfig config;
63
64 volatile boolean requestHeaderWritten;
65
66 final Object interestOpsLock = new Object();
67
68 final SocketChannel realChannel;
69
70 private final ServletChannelHandler handler = new ServletChannelHandler();
71
72 HttpTunnelingClientSocketChannel(
73 ChannelFactory factory,
74 ChannelPipeline pipeline,
75 ChannelSink sink, ClientSocketChannelFactory clientSocketChannelFactory) {
76
77 super(null, factory, pipeline, sink);
78
79 config = new HttpTunnelingSocketChannelConfig(this);
80 DefaultChannelPipeline channelPipeline = new DefaultChannelPipeline();
81 channelPipeline.addLast("decoder", new HttpResponseDecoder());
82 channelPipeline.addLast("encoder", new HttpRequestEncoder());
83 channelPipeline.addLast("handler", handler);
84 realChannel = clientSocketChannelFactory.newChannel(channelPipeline);
85
86 fireChannelOpen(this);
87 }
88
89 public HttpTunnelingSocketChannelConfig getConfig() {
90 return config;
91 }
92
93 public InetSocketAddress getLocalAddress() {
94 return realChannel.getLocalAddress();
95 }
96
97 public InetSocketAddress getRemoteAddress() {
98 return realChannel.getRemoteAddress();
99 }
100
101 public boolean isBound() {
102 return realChannel.isBound();
103 }
104
105 public boolean isConnected() {
106 return realChannel.isConnected();
107 }
108
109 @Override
110 public int getInterestOps() {
111 return realChannel.getInterestOps();
112 }
113
114 @Override
115 public boolean isWritable() {
116 return realChannel.isWritable();
117 }
118
119 @Override
120 protected boolean setClosed() {
121 return super.setClosed();
122 }
123
124 @Override
125 public ChannelFuture write(Object message, SocketAddress remoteAddress) {
126 if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
127 return super.write(message, null);
128 } else {
129 return getUnsupportedOperationFuture();
130 }
131 }
132
133 void bindReal(final SocketAddress localAddress, final ChannelFuture future) {
134 realChannel.bind(localAddress).addListener(new ChannelFutureListener() {
135 public void operationComplete(ChannelFuture f) {
136 if (f.isSuccess()) {
137 future.setSuccess();
138 } else {
139 future.setFailure(f.getCause());
140 }
141 }
142 });
143 }
144
145 void connectReal(final SocketAddress remoteAddress, final ChannelFuture future) {
146 final SocketChannel virtualChannel = this;
147 realChannel.connect(remoteAddress).addListener(new ChannelFutureListener() {
148 public void operationComplete(ChannelFuture f) {
149 final String serverName = config.getServerName();
150 final int serverPort = ((InetSocketAddress) remoteAddress).getPort();
151 final String serverPath = config.getServerPath();
152
153 if (f.isSuccess()) {
154
155 SSLContext sslContext = config.getSslContext();
156 ChannelFuture sslHandshakeFuture = null;
157 if (sslContext != null) {
158
159 SSLEngine engine;
160 if (serverName != null) {
161 engine = sslContext.createSSLEngine(serverName, serverPort);
162 } else {
163 engine = sslContext.createSSLEngine();
164 }
165
166
167 engine.setUseClientMode(true);
168 engine.setEnableSessionCreation(config.isEnableSslSessionCreation());
169 String[] enabledCipherSuites = config.getEnabledSslCipherSuites();
170 if (enabledCipherSuites != null) {
171 engine.setEnabledCipherSuites(enabledCipherSuites);
172 }
173 String[] enabledProtocols = config.getEnabledSslProtocols();
174 if (enabledProtocols != null) {
175 engine.setEnabledProtocols(enabledProtocols);
176 }
177
178 SslHandler sslHandler = new SslHandler(engine);
179 realChannel.getPipeline().addFirst("ssl", sslHandler);
180 sslHandshakeFuture = sslHandler.handshake();
181 }
182
183
184 final HttpRequest req = new DefaultHttpRequest(
185 HttpVersion.HTTP_1_1, HttpMethod.POST, serverPath);
186 if (serverName != null) {
187 req.headers().set(HttpHeaders.Names.HOST, serverName);
188 }
189 req.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
190 req.headers().set(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
191 req.headers().set(HttpHeaders.Names.CONTENT_TRANSFER_ENCODING, HttpHeaders.Values.BINARY);
192 req.headers().set(HttpHeaders.Names.USER_AGENT, HttpTunnelingClientSocketChannel.class.getName());
193
194 if (sslHandshakeFuture == null) {
195 realChannel.write(req);
196 requestHeaderWritten = true;
197 future.setSuccess();
198 fireChannelConnected(virtualChannel, remoteAddress);
199 } else {
200 sslHandshakeFuture.addListener(new ChannelFutureListener() {
201 public void operationComplete(ChannelFuture f) {
202 if (f.isSuccess()) {
203 realChannel.write(req);
204 requestHeaderWritten = true;
205 future.setSuccess();
206 fireChannelConnected(virtualChannel, remoteAddress);
207 } else {
208 future.setFailure(f.getCause());
209 fireExceptionCaught(virtualChannel, f.getCause());
210 }
211 }
212 });
213 }
214 } else {
215 future.setFailure(f.getCause());
216 fireExceptionCaught(virtualChannel, f.getCause());
217 }
218 }
219 });
220 }
221
222 void writeReal(final ChannelBuffer a, final ChannelFuture future) {
223 if (!requestHeaderWritten) {
224 throw new NotYetConnectedException();
225 }
226
227 final int size = a.readableBytes();
228 final ChannelFuture f;
229
230 if (size == 0) {
231 f = realChannel.write(ChannelBuffers.EMPTY_BUFFER);
232 } else {
233 f = realChannel.write(new DefaultHttpChunk(a));
234 }
235
236 f.addListener(new ChannelFutureListener() {
237 public void operationComplete(ChannelFuture f) {
238 if (f.isSuccess()) {
239 future.setSuccess();
240 if (size != 0) {
241 fireWriteComplete(HttpTunnelingClientSocketChannel.this, size);
242 }
243 } else {
244 future.setFailure(f.getCause());
245 }
246 }
247 });
248 }
249
250 private ChannelFuture writeLastChunk() {
251 if (!requestHeaderWritten) {
252 return failedFuture(this, new NotYetConnectedException());
253 } else {
254 return realChannel.write(HttpChunk.LAST_CHUNK);
255 }
256 }
257
258 void setInterestOpsReal(final int interestOps, final ChannelFuture future) {
259 realChannel.setInterestOps(interestOps).addListener(new ChannelFutureListener() {
260 public void operationComplete(ChannelFuture f) {
261 if (f.isSuccess()) {
262 future.setSuccess();
263 } else {
264 future.setFailure(f.getCause());
265 }
266 }
267 });
268 }
269
270 void disconnectReal(final ChannelFuture future) {
271 writeLastChunk().addListener(new ChannelFutureListener() {
272 public void operationComplete(ChannelFuture f) {
273 realChannel.disconnect().addListener(new ChannelFutureListener() {
274 public void operationComplete(ChannelFuture f) {
275 if (f.isSuccess()) {
276 future.setSuccess();
277 } else {
278 future.setFailure(f.getCause());
279 }
280 }
281 });
282 }
283 });
284 }
285
286 void unbindReal(final ChannelFuture future) {
287 writeLastChunk().addListener(new ChannelFutureListener() {
288 public void operationComplete(ChannelFuture f) {
289 realChannel.unbind().addListener(new ChannelFutureListener() {
290 public void operationComplete(ChannelFuture f) {
291 if (f.isSuccess()) {
292 future.setSuccess();
293 } else {
294 future.setFailure(f.getCause());
295 }
296 }
297 });
298 }
299 });
300 }
301
302 void closeReal(final ChannelFuture future) {
303 writeLastChunk().addListener(new ChannelFutureListener() {
304 public void operationComplete(ChannelFuture f) {
305 realChannel.close().addListener(new ChannelFutureListener() {
306 public void operationComplete(ChannelFuture f) {
307
308
309
310
311
312 if (f.isSuccess()) {
313 future.setSuccess();
314 } else {
315 future.setFailure(f.getCause());
316 }
317
318
319 setClosed();
320 }
321 });
322 }
323 });
324 }
325
326 final class ServletChannelHandler extends SimpleChannelUpstreamHandler {
327
328 private volatile boolean readingChunks;
329 final SocketChannel virtualChannel = HttpTunnelingClientSocketChannel.this;
330
331 @Override
332 public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e)
333 throws Exception {
334 fireChannelBound(virtualChannel, (SocketAddress) e.getValue());
335 }
336
337 @Override
338 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
339 if (!readingChunks) {
340 HttpResponse res = (HttpResponse) e.getMessage();
341 if (res.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
342 throw new ChannelException("Unexpected HTTP response status: " + res.getStatus());
343 }
344
345 if (res.isChunked()) {
346 readingChunks = true;
347 } else {
348 ChannelBuffer content = res.getContent();
349 if (content.readable()) {
350 fireMessageReceived(HttpTunnelingClientSocketChannel.this, content);
351 }
352
353 closeReal(succeededFuture(virtualChannel));
354 }
355 } else {
356 HttpChunk chunk = (HttpChunk) e.getMessage();
357 if (!chunk.isLast()) {
358 fireMessageReceived(HttpTunnelingClientSocketChannel.this, chunk.getContent());
359 } else {
360 readingChunks = false;
361
362 closeReal(succeededFuture(virtualChannel));
363 }
364 }
365 }
366
367 @Override
368 public void channelInterestChanged(ChannelHandlerContext ctx,
369 ChannelStateEvent e) throws Exception {
370 fireChannelInterestChanged(virtualChannel);
371 }
372
373 @Override
374 public void channelDisconnected(ChannelHandlerContext ctx,
375 ChannelStateEvent e) throws Exception {
376 fireChannelDisconnected(virtualChannel);
377 }
378
379 @Override
380 public void channelUnbound(ChannelHandlerContext ctx,
381 ChannelStateEvent e) throws Exception {
382 fireChannelUnbound(virtualChannel);
383 }
384
385 @Override
386 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
387 throws Exception {
388 fireChannelClosed(virtualChannel);
389 }
390
391 @Override
392 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
393 fireExceptionCaught(virtualChannel, e.getCause());
394 realChannel.close();
395 }
396 }
397 }