View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.example.proxy;
17  
18  import org.jboss.netty.bootstrap.ClientBootstrap;
19  import org.jboss.netty.buffer.ChannelBuffer;
20  import org.jboss.netty.buffer.ChannelBuffers;
21  import org.jboss.netty.channel.Channel;
22  import org.jboss.netty.channel.ChannelFuture;
23  import org.jboss.netty.channel.ChannelFutureListener;
24  import org.jboss.netty.channel.ChannelHandlerContext;
25  import org.jboss.netty.channel.ChannelStateEvent;
26  import org.jboss.netty.channel.ExceptionEvent;
27  import org.jboss.netty.channel.MessageEvent;
28  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
29  import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
30  
31  import java.net.InetSocketAddress;
32  
33  public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
34  
35      private final ClientSocketChannelFactory cf;
36  
37      // This lock guards against the race condition that overrides the
38      // OP_READ flag incorrectly.
39      // See the related discussion: http://markmail.org/message/x7jc6mqx6ripynqf
40      final Object trafficLock = new Object();
41  
42      private volatile Channel outboundChannel;
43  
44      public HexDumpProxyInboundHandler(ClientSocketChannelFactory cf) {
45          this.cf = cf;
46      }
47  
48      @Override
49      public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
50          // Suspend incoming traffic until connected to the remote host.
51          final Channel inboundChannel = e.getChannel();
52          inboundChannel.setReadable(false);
53  
54          // Start the connection attempt.
55          ClientBootstrap cb = new ClientBootstrap(cf);
56          cb.getPipeline().addLast("handler", new OutboundHandler(e.getChannel()));
57          ChannelFuture f = cb.connect(new InetSocketAddress(HexDumpProxy.REMOTE_HOST, HexDumpProxy.REMOTE_PORT));
58  
59          outboundChannel = f.getChannel();
60          f.addListener(new ChannelFutureListener() {
61              public void operationComplete(ChannelFuture future) {
62                  if (future.isSuccess()) {
63                      // Connection attempt succeeded:
64                      // Begin to accept incoming traffic.
65                      inboundChannel.setReadable(true);
66                  } else {
67                      // Close the connection if the connection attempt has failed.
68                      inboundChannel.close();
69                  }
70              }
71          });
72      }
73  
74      @Override
75      public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
76          ChannelBuffer msg = (ChannelBuffer) e.getMessage();
77          //System.err.println(">>> " + ChannelBuffers.hexDump(msg));
78          synchronized (trafficLock) {
79              outboundChannel.write(msg);
80              // If outboundChannel is saturated, do not read until notified in
81              // OutboundHandler.channelInterestChanged().
82              if (!outboundChannel.isWritable()) {
83                  e.getChannel().setReadable(false);
84              }
85          }
86      }
87  
88      @Override
89      public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
90          // If inboundChannel is not saturated anymore, continue accepting
91          // the incoming traffic from the outboundChannel.
92          synchronized (trafficLock) {
93              if (e.getChannel().isWritable()) {
94                  if (outboundChannel != null) {
95                      outboundChannel.setReadable(true);
96                  }
97              }
98          }
99      }
100 
101     @Override
102     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) {
103         if (outboundChannel != null) {
104             closeOnFlush(outboundChannel);
105         }
106     }
107 
108     @Override
109     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
110         e.getCause().printStackTrace();
111         closeOnFlush(e.getChannel());
112     }
113 
114     private class OutboundHandler extends SimpleChannelUpstreamHandler {
115 
116         private final Channel inboundChannel;
117 
118         OutboundHandler(Channel inboundChannel) {
119             this.inboundChannel = inboundChannel;
120         }
121 
122         @Override
123         public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
124             ChannelBuffer msg = (ChannelBuffer) e.getMessage();
125             //System.err.println("<<< " + ChannelBuffers.hexDump(msg));
126             synchronized (trafficLock) {
127                 inboundChannel.write(msg);
128                 // If inboundChannel is saturated, do not read until notified in
129                 // HexDumpProxyInboundHandler.channelInterestChanged().
130                 if (!inboundChannel.isWritable()) {
131                     e.getChannel().setReadable(false);
132                 }
133             }
134         }
135 
136         @Override
137         public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
138             // If outboundChannel is not saturated anymore, continue accepting
139             // the incoming traffic from the inboundChannel.
140             synchronized (trafficLock) {
141                 if (e.getChannel().isWritable()) {
142                     inboundChannel.setReadable(true);
143                 }
144             }
145         }
146 
147         @Override
148         public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) {
149             closeOnFlush(inboundChannel);
150         }
151 
152         @Override
153         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
154             e.getCause().printStackTrace();
155             closeOnFlush(e.getChannel());
156         }
157     }
158 
159     /**
160      * Closes the specified channel after all queued write requests are flushed.
161      */
162     static void closeOnFlush(Channel ch) {
163         if (ch.isConnected()) {
164             ch.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
165         }
166     }
167 }