View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.example.proxy;
17  
18  import java.net.InetSocketAddress;
19  
20  import org.jboss.netty.bootstrap.ClientBootstrap;
21  import org.jboss.netty.buffer.ChannelBuffer;
22  import org.jboss.netty.buffer.ChannelBuffers;
23  import org.jboss.netty.channel.Channel;
24  import org.jboss.netty.channel.ChannelFuture;
25  import org.jboss.netty.channel.ChannelFutureListener;
26  import org.jboss.netty.channel.ChannelHandlerContext;
27  import org.jboss.netty.channel.ChannelStateEvent;
28  import org.jboss.netty.channel.ExceptionEvent;
29  import org.jboss.netty.channel.MessageEvent;
30  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
31  import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
32  
33  public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
34  
35      private final ClientSocketChannelFactory cf;
36      private final String remoteHost;
37      private final int remotePort;
38  
39      // This lock guards against the race condition that overrides the
40      // OP_READ flag incorrectly.
41      // See the related discussion: http://markmail.org/message/x7jc6mqx6ripynqf
42      final Object trafficLock = new Object();
43  
44      private volatile Channel outboundChannel;
45  
46      public HexDumpProxyInboundHandler(
47              ClientSocketChannelFactory cf, String remoteHost, int remotePort) {
48          this.cf = cf;
49          this.remoteHost = remoteHost;
50          this.remotePort = remotePort;
51      }
52  
53      @Override
54      public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
55              throws Exception {
56          // Suspend incoming traffic until connected to the remote host.
57          final Channel inboundChannel = e.getChannel();
58          inboundChannel.setReadable(false);
59  
60          // Start the connection attempt.
61          ClientBootstrap cb = new ClientBootstrap(cf);
62          cb.getPipeline().addLast("handler", new OutboundHandler(e.getChannel()));
63          ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));
64  
65          outboundChannel = f.getChannel();
66          f.addListener(new ChannelFutureListener() {
67              public void operationComplete(ChannelFuture future) throws Exception {
68                  if (future.isSuccess()) {
69                      // Connection attempt succeeded:
70                      // Begin to accept incoming traffic.
71                      inboundChannel.setReadable(true);
72                  } else {
73                      // Close the connection if the connection attempt has failed.
74                      inboundChannel.close();
75                  }
76              }
77          });
78      }
79  
80      @Override
81      public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
82              throws Exception {
83          ChannelBuffer msg = (ChannelBuffer) e.getMessage();
84          //System.out.println(">>> " + ChannelBuffers.hexDump(msg));
85          synchronized (trafficLock) {
86              outboundChannel.write(msg);
87              // If outboundChannel is saturated, do not read until notified in
88              // OutboundHandler.channelInterestChanged().
89              if (!outboundChannel.isWritable()) {
90                  e.getChannel().setReadable(false);
91              }
92          }
93      }
94  
95      @Override
96      public void channelInterestChanged(ChannelHandlerContext ctx,
97              ChannelStateEvent e) throws Exception {
98          // If inboundChannel is not saturated anymore, continue accepting
99          // the incoming traffic from the outboundChannel.
100         synchronized (trafficLock) {
101             if (e.getChannel().isWritable()) {
102                 if (outboundChannel != null) {
103                     outboundChannel.setReadable(true);
104                 }
105             }
106         }
107     }
108 
109     @Override
110     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
111             throws Exception {
112         if (outboundChannel != null) {
113             closeOnFlush(outboundChannel);
114         }
115     }
116 
117     @Override
118     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
119             throws Exception {
120         e.getCause().printStackTrace();
121         closeOnFlush(e.getChannel());
122     }
123 
124     private class OutboundHandler extends SimpleChannelUpstreamHandler {
125 
126         private final Channel inboundChannel;
127 
128         OutboundHandler(Channel inboundChannel) {
129             this.inboundChannel = inboundChannel;
130         }
131 
132         @Override
133         public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
134                 throws Exception {
135             ChannelBuffer msg = (ChannelBuffer) e.getMessage();
136             //System.out.println("<<< " + ChannelBuffers.hexDump(msg));
137             synchronized (trafficLock) {
138                 inboundChannel.write(msg);
139                 // If inboundChannel is saturated, do not read until notified in
140                 // HexDumpProxyInboundHandler.channelInterestChanged().
141                 if (!inboundChannel.isWritable()) {
142                     e.getChannel().setReadable(false);
143                 }
144             }
145         }
146 
147         @Override
148         public void channelInterestChanged(ChannelHandlerContext ctx,
149                 ChannelStateEvent e) throws Exception {
150             // If outboundChannel is not saturated anymore, continue accepting
151             // the incoming traffic from the inboundChannel.
152             synchronized (trafficLock) {
153                 if (e.getChannel().isWritable()) {
154                     inboundChannel.setReadable(true);
155                 }
156             }
157         }
158 
159         @Override
160         public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
161                 throws Exception {
162             closeOnFlush(inboundChannel);
163         }
164 
165         @Override
166         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
167                 throws Exception {
168             e.getCause().printStackTrace();
169             closeOnFlush(e.getChannel());
170         }
171     }
172 
173     /**
174      * Closes the specified channel after all queued write requests are flushed.
175      */
176     static void closeOnFlush(Channel ch) {
177         if (ch.isConnected()) {
178             ch.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
179         }
180     }
181 }