1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.example.proxy;
17
18 import java.net.InetSocketAddress;
19
20 import org.jboss.netty.bootstrap.ClientBootstrap;
21 import org.jboss.netty.buffer.ChannelBuffer;
22 import org.jboss.netty.buffer.ChannelBuffers;
23 import org.jboss.netty.channel.Channel;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelFutureListener;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.ExceptionEvent;
29 import org.jboss.netty.channel.MessageEvent;
30 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
31 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
32
33 public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
34
35 private final ClientSocketChannelFactory cf;
36 private final String remoteHost;
37 private final int remotePort;
38
39
40
41
42 final Object trafficLock = new Object();
43
44 private volatile Channel outboundChannel;
45
46 public HexDumpProxyInboundHandler(
47 ClientSocketChannelFactory cf, String remoteHost, int remotePort) {
48 this.cf = cf;
49 this.remoteHost = remoteHost;
50 this.remotePort = remotePort;
51 }
52
53 @Override
54 public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
55 throws Exception {
56
57 final Channel inboundChannel = e.getChannel();
58 inboundChannel.setReadable(false);
59
60
61 ClientBootstrap cb = new ClientBootstrap(cf);
62 cb.getPipeline().addLast("handler", new OutboundHandler(e.getChannel()));
63 ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));
64
65 outboundChannel = f.getChannel();
66 f.addListener(new ChannelFutureListener() {
67 public void operationComplete(ChannelFuture future) throws Exception {
68 if (future.isSuccess()) {
69
70
71 inboundChannel.setReadable(true);
72 } else {
73
74 inboundChannel.close();
75 }
76 }
77 });
78 }
79
80 @Override
81 public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
82 throws Exception {
83 ChannelBuffer msg = (ChannelBuffer) e.getMessage();
84
85 synchronized (trafficLock) {
86 outboundChannel.write(msg);
87
88
89 if (!outboundChannel.isWritable()) {
90 e.getChannel().setReadable(false);
91 }
92 }
93 }
94
95 @Override
96 public void channelInterestChanged(ChannelHandlerContext ctx,
97 ChannelStateEvent e) throws Exception {
98
99
100 synchronized (trafficLock) {
101 if (e.getChannel().isWritable()) {
102 if (outboundChannel != null) {
103 outboundChannel.setReadable(true);
104 }
105 }
106 }
107 }
108
109 @Override
110 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
111 throws Exception {
112 if (outboundChannel != null) {
113 closeOnFlush(outboundChannel);
114 }
115 }
116
117 @Override
118 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
119 throws Exception {
120 e.getCause().printStackTrace();
121 closeOnFlush(e.getChannel());
122 }
123
124 private class OutboundHandler extends SimpleChannelUpstreamHandler {
125
126 private final Channel inboundChannel;
127
128 OutboundHandler(Channel inboundChannel) {
129 this.inboundChannel = inboundChannel;
130 }
131
132 @Override
133 public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
134 throws Exception {
135 ChannelBuffer msg = (ChannelBuffer) e.getMessage();
136
137 synchronized (trafficLock) {
138 inboundChannel.write(msg);
139
140
141 if (!inboundChannel.isWritable()) {
142 e.getChannel().setReadable(false);
143 }
144 }
145 }
146
147 @Override
148 public void channelInterestChanged(ChannelHandlerContext ctx,
149 ChannelStateEvent e) throws Exception {
150
151
152 synchronized (trafficLock) {
153 if (e.getChannel().isWritable()) {
154 inboundChannel.setReadable(true);
155 }
156 }
157 }
158
159 @Override
160 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
161 throws Exception {
162 closeOnFlush(inboundChannel);
163 }
164
165 @Override
166 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
167 throws Exception {
168 e.getCause().printStackTrace();
169 closeOnFlush(e.getChannel());
170 }
171 }
172
173
174
175
176 static void closeOnFlush(Channel ch) {
177 if (ch.isConnected()) {
178 ch.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
179 }
180 }
181 }