View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.traffic;
17  
18  import java.util.LinkedList;
19  import java.util.List;
20  import java.util.concurrent.TimeUnit;
21  
22  import org.jboss.netty.channel.ChannelHandlerContext;
23  import org.jboss.netty.channel.ChannelPipelineFactory;
24  import org.jboss.netty.channel.ChannelStateEvent;
25  import org.jboss.netty.channel.MessageEvent;
26  import org.jboss.netty.handler.execution.ExecutionHandler;
27  import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
28  import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
29  import org.jboss.netty.util.ObjectSizeEstimator;
30  import org.jboss.netty.util.Timeout;
31  import org.jboss.netty.util.Timer;
32  import org.jboss.netty.util.TimerTask;
33  
34  /**
35   * This implementation of the {@link AbstractTrafficShapingHandler} is for channel
36   * traffic shaping, that is to say a per channel limitation of the bandwidth.<br><br>
37   *
38   * The general use should be as follow:<br>
39   * <ul>
40   * <li>Add in your pipeline a new ChannelTrafficShapingHandler, before a recommended {@link ExecutionHandler} (like
41   * {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).<br>
42   * <tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler(timer);</tt><br>
43   * timer could be created using <tt>HashedWheelTimer</tt><br>
44   * <tt>pipeline.addLast("CHANNEL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
45   *
46   * <b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
47   * for each new channel as the counter cannot be shared among all channels.</b> For instance, if you have a
48   * {@link ChannelPipelineFactory}, you should create a new ChannelTrafficShapingHandler in this
49   * {@link ChannelPipelineFactory} each time getPipeline() method is called.<br><br>
50   *
51   * Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
52   * or the check interval (in millisecond) that represents the delay between two computations of the
53   * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br><br>
54   *
55   * A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
56   * it is recommended to set a positive value, even if it is high since the precision of the
57   * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
58   * the less precise the traffic shaping will be. It is suggested as higher value something close
59   * to 5 or 10 minutes.<br><br>
60   *
61   * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br>
62   * </li>
63   * <li>When you shutdown your application, release all the external resources (except the timer internal itself)
64   * by calling:<br>
65   * <tt>myHandler.releaseExternalResources();</tt><br>
66   * </li>
67   * </ul><br>
68   */
69  public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
70      private List<ToSend> messagesQueue = new LinkedList<ToSend>();
71      private volatile Timeout writeTimeout;
72  
73      public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
74              long readLimit, long checkInterval) {
75          super(timer, writeLimit, readLimit, checkInterval);
76      }
77  
78      public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
79              long readLimit, long checkInterval, long maxTime) {
80          super(timer, writeLimit, readLimit, checkInterval, maxTime);
81      }
82  
83      public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
84              long readLimit) {
85          super(timer, writeLimit, readLimit);
86      }
87  
88      public ChannelTrafficShapingHandler(Timer timer, long checkInterval) {
89          super(timer, checkInterval);
90      }
91  
92      public ChannelTrafficShapingHandler(Timer timer) {
93          super(timer);
94      }
95  
96      public ChannelTrafficShapingHandler(
97              ObjectSizeEstimator objectSizeEstimator, Timer timer,
98              long writeLimit, long readLimit, long checkInterval) {
99          super(objectSizeEstimator, timer, writeLimit, readLimit,
100                 checkInterval);
101     }
102 
103     public ChannelTrafficShapingHandler(
104             ObjectSizeEstimator objectSizeEstimator, Timer timer,
105             long writeLimit, long readLimit, long checkInterval, long maxTime) {
106         super(objectSizeEstimator, timer, writeLimit, readLimit,
107                 checkInterval, maxTime);
108     }
109 
110     public ChannelTrafficShapingHandler(
111             ObjectSizeEstimator objectSizeEstimator, Timer timer,
112             long writeLimit, long readLimit) {
113         super(objectSizeEstimator, timer, writeLimit, readLimit);
114     }
115 
116     public ChannelTrafficShapingHandler(
117             ObjectSizeEstimator objectSizeEstimator, Timer timer,
118             long checkInterval) {
119         super(objectSizeEstimator, timer, checkInterval);
120     }
121 
122     public ChannelTrafficShapingHandler(
123             ObjectSizeEstimator objectSizeEstimator, Timer timer) {
124         super(objectSizeEstimator, timer);
125     }
126 
127     private static final class ToSend {
128         final long date;
129         final MessageEvent toSend;
130 
131         private ToSend(final long delay, final MessageEvent toSend) {
132             this.date = System.currentTimeMillis() + delay;
133             this.toSend = toSend;
134         }
135     }
136 
137     @Override
138     protected synchronized void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long delay)
139             throws Exception {
140         if (delay == 0 && messagesQueue.isEmpty()) {
141             internalSubmitWrite(ctx, evt);
142             return;
143         }
144         if (timer == null) {
145             // Sleep since no executor
146             Thread.sleep(delay);
147             internalSubmitWrite(ctx, evt);
148             return;
149         }
150         final ToSend newToSend = new ToSend(delay, evt);
151         messagesQueue.add(newToSend);
152         writeTimeout = timer.newTimeout(new TimerTask() {
153             public void run(Timeout timeout) throws Exception {
154                 sendAllValid(ctx);
155             }
156         }, delay + 1, TimeUnit.MILLISECONDS);
157     }
158 
159     private synchronized void sendAllValid(ChannelHandlerContext ctx) throws Exception {
160         while (!messagesQueue.isEmpty()) {
161             ToSend newToSend = messagesQueue.remove(0);
162             if (newToSend.date <= System.currentTimeMillis()) {
163                 internalSubmitWrite(ctx, newToSend.toSend);
164             } else {
165                 messagesQueue.add(0, newToSend);
166                 break;
167             }
168         }
169     }
170 
171     @Override
172     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
173             throws Exception {
174         if (trafficCounter != null) {
175             trafficCounter.stop();
176         }
177         messagesQueue.clear();
178         if (writeTimeout != null) {
179             writeTimeout.cancel();
180         }
181         super.channelClosed(ctx, e);
182     }
183 
184     @Override
185     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
186             throws Exception {
187         // readSuspended = true;
188         ctx.setAttachment(Boolean.TRUE);
189         ctx.getChannel().setReadable(false);
190         if (trafficCounter == null) {
191             // create a new counter now
192             if (timer != null) {
193                 trafficCounter = new TrafficCounter(this, timer, "ChannelTC" +
194                         ctx.getChannel().getId(), checkInterval);
195             }
196         }
197         if (trafficCounter != null) {
198             trafficCounter.start();
199         }
200         // readSuspended = false;
201         ctx.setAttachment(null);
202         ctx.getChannel().setReadable(true);
203         super.channelConnected(ctx, e);
204     }
205 
206 }