View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.traffic;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelHandlerContext;
20  import org.jboss.netty.channel.ChannelPipelineFactory;
21  import org.jboss.netty.channel.ChannelStateEvent;
22  import org.jboss.netty.channel.MessageEvent;
23  import org.jboss.netty.handler.execution.ExecutionHandler;
24  import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
25  import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
26  import org.jboss.netty.util.ObjectSizeEstimator;
27  import org.jboss.netty.util.Timeout;
28  import org.jboss.netty.util.Timer;
29  import org.jboss.netty.util.TimerTask;
30  
31  import java.util.LinkedList;
32  import java.util.List;
33  import java.util.concurrent.TimeUnit;
34  
35  /**
36   * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for channel
37   * traffic shaping, that is to say a per channel limitation of the bandwidth.</p>
38   *
39   * The general use should be as follow:<br>
40   * <ul>
41   * <li><p>Add in your pipeline a new ChannelTrafficShapingHandler, before a recommended {@link ExecutionHandler} (like
42   * {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).</p>
43   * <p><tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler(timer);</tt></p>
44   * <p>timer could be created using <tt>HashedWheelTimer</tt></p>
45   * <p><tt>pipeline.addLast("CHANNEL_TRAFFIC_SHAPING", myHandler);</tt></p>
46   *
47   * <p><b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
48   * for each new channel as the counter cannot be shared among all channels.</b> For instance, if you have a
49   * {@link ChannelPipelineFactory}, you should create a new ChannelTrafficShapingHandler in this
50   * {@link ChannelPipelineFactory} each time getPipeline() method is called.</p>
51   *
52   * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
53   * or the check interval (in millisecond) that represents the delay between two computations of the
54   * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
55   *
56   * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
57   * it is recommended to set a positive value, even if it is high since the precision of the
58   * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
59   * the less precise the traffic shaping will be. It is suggested as higher value something close
60   * to 5 or 10 minutes.</p>
61   *
62   * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
63   * </li>
64   * <li>When you shutdown your application, release all the external resources (except the timer internal itself)
65   * by calling:<br>
66   * <tt>myHandler.releaseExternalResources();</tt><br>
67   * </li>
68   * <li>In your handler, you should consider to use the {@code channel.isWritable()} and
69   * {@code channelInterestChanged(ctx, event)} to handle writability, or through
70   * {@code future.addListener(new ChannelFutureListener())} on the future returned by
71   * {@code channel.write()}.</li>
72   * <li><p>You shall also consider to have object size in read or write operations relatively adapted to
73   * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
74   * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
75   * <li><p>Some configuration methods will be taken as best effort, meaning
76   * that all already scheduled traffics will not be
77   * changed, but only applied to new traffics.</p>
78   * So the expected usage of those methods are to be used not too often,
79   * accordingly to the traffic shaping configuration.</li>
80   * </ul><br>
81   */
82  public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
83      private final List<ToSend> messagesQueue = new LinkedList<ToSend>();
84      private long queueSize;
85      private volatile Timeout writeTimeout;
86      private volatile ChannelHandlerContext ctx;
87  
88      public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
89              long readLimit, long checkInterval) {
90          super(timer, writeLimit, readLimit, checkInterval);
91      }
92  
93      public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
94              long readLimit, long checkInterval, long maxTime) {
95          super(timer, writeLimit, readLimit, checkInterval, maxTime);
96      }
97  
98      public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
99              long readLimit) {
100         super(timer, writeLimit, readLimit);
101     }
102 
103     public ChannelTrafficShapingHandler(Timer timer, long checkInterval) {
104         super(timer, checkInterval);
105     }
106 
107     public ChannelTrafficShapingHandler(Timer timer) {
108         super(timer);
109     }
110 
111     public ChannelTrafficShapingHandler(
112             ObjectSizeEstimator objectSizeEstimator, Timer timer,
113             long writeLimit, long readLimit, long checkInterval) {
114         super(objectSizeEstimator, timer, writeLimit, readLimit,
115                 checkInterval);
116     }
117 
118     public ChannelTrafficShapingHandler(
119             ObjectSizeEstimator objectSizeEstimator, Timer timer,
120             long writeLimit, long readLimit, long checkInterval, long maxTime) {
121         super(objectSizeEstimator, timer, writeLimit, readLimit,
122                 checkInterval, maxTime);
123     }
124 
125     public ChannelTrafficShapingHandler(
126             ObjectSizeEstimator objectSizeEstimator, Timer timer,
127             long writeLimit, long readLimit) {
128         super(objectSizeEstimator, timer, writeLimit, readLimit);
129     }
130 
131     public ChannelTrafficShapingHandler(
132             ObjectSizeEstimator objectSizeEstimator, Timer timer,
133             long checkInterval) {
134         super(objectSizeEstimator, timer, checkInterval);
135     }
136 
137     public ChannelTrafficShapingHandler(
138             ObjectSizeEstimator objectSizeEstimator, Timer timer) {
139         super(objectSizeEstimator, timer);
140     }
141 
142     private static final class ToSend {
143         final long relativeTimeAction;
144         final MessageEvent toSend;
145 
146         private ToSend(final long delay, final MessageEvent toSend) {
147             relativeTimeAction = delay;
148             this.toSend = toSend;
149         }
150     }
151 
152     @Override
153     void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long size,
154             final long delay, final long now) throws Exception {
155         if (ctx == null) {
156             this.ctx = ctx;
157         }
158         final ToSend newToSend;
159         Channel channel = ctx.getChannel();
160         synchronized (this) {
161             if (delay == 0 && messagesQueue.isEmpty()) {
162                 if (! channel.isConnected()) {
163                     // ignore
164                     return;
165                 }
166                 if (trafficCounter != null) {
167                     trafficCounter.bytesRealWriteFlowControl(size);
168                 }
169                 ctx.sendDownstream(evt);
170                 return;
171             }
172             if (timer == null) {
173                 // Sleep since no executor
174                 Thread.sleep(delay);
175                 if (! channel.isConnected()) {
176                     // ignore
177                     return;
178                 }
179                 if (trafficCounter != null) {
180                     trafficCounter.bytesRealWriteFlowControl(size);
181                 }
182                 ctx.sendDownstream(evt);
183                 return;
184             }
185             if (! channel.isConnected()) {
186                 // ignore
187                 return;
188             }
189             newToSend = new ToSend(delay + now, evt);
190             messagesQueue.add(newToSend);
191             queueSize += size;
192             checkWriteSuspend(ctx, delay, queueSize);
193         }
194         final long futureNow = newToSend.relativeTimeAction;
195         writeTimeout = timer.newTimeout(new TimerTask() {
196             public void run(Timeout timeout) throws Exception {
197                 sendAllValid(ctx, futureNow);
198             }
199         }, delay + 1, TimeUnit.MILLISECONDS);
200     }
201 
202     private void sendAllValid(ChannelHandlerContext ctx, final long now) throws Exception {
203         Channel channel = ctx.getChannel();
204         if (! channel.isConnected()) {
205             // ignore
206             return;
207         }
208         synchronized (this) {
209             while (!messagesQueue.isEmpty()) {
210                 ToSend newToSend = messagesQueue.remove(0);
211                 if (newToSend.relativeTimeAction <= now) {
212                     long size = calculateSize(newToSend.toSend.getMessage());
213                     if (trafficCounter != null) {
214                         trafficCounter.bytesRealWriteFlowControl(size);
215                     }
216                     queueSize -= size;
217                     if (! channel.isConnected()) {
218                         // ignore
219                         break;
220                     }
221                     ctx.sendDownstream(newToSend.toSend);
222                 } else {
223                     messagesQueue.add(0, newToSend);
224                     break;
225                 }
226             }
227             if (messagesQueue.isEmpty()) {
228                 releaseWriteSuspended(ctx);
229             }
230         }
231     }
232 
233    /**
234     * @return current size in bytes of the write buffer.
235     */
236    public long queueSize() {
237        return queueSize;
238    }
239 
240    @Override
241     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
242             throws Exception {
243         if (trafficCounter != null) {
244             trafficCounter.stop();
245         }
246         synchronized (this) {
247             messagesQueue.clear();
248         }
249         if (writeTimeout != null) {
250             writeTimeout.cancel();
251         }
252         super.channelClosed(ctx, e);
253     }
254 
255     @Override
256     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
257             throws Exception {
258         this.ctx = ctx;
259         // readSuspended = true;
260         ReadWriteStatus rws = checkAttachment(ctx);
261         rws.readSuspend = true;
262         ctx.getChannel().setReadable(false);
263         if (trafficCounter == null) {
264             // create a new counter now
265             if (timer != null) {
266                 trafficCounter = new TrafficCounter(this, timer, "ChannelTC" +
267                         ctx.getChannel().getId(), checkInterval);
268             }
269         }
270         if (trafficCounter != null) {
271             trafficCounter.start();
272         }
273         rws.readSuspend = false;
274         ctx.getChannel().setReadable(true);
275         super.channelConnected(ctx, e);
276     }
277 
278     @Override
279     public void releaseExternalResources() {
280         Channel channel = ctx.getChannel();
281         synchronized (this) {
282             if (ctx != null && ctx.getChannel().isConnected()) {
283                 for (ToSend toSend : messagesQueue) {
284                     if (! channel.isConnected()) {
285                         // ignore
286                         break;
287                     }
288                     ctx.sendDownstream(toSend.toSend);
289                 }
290             }
291             messagesQueue.clear();
292         }
293         if (writeTimeout != null) {
294             writeTimeout.cancel();
295         }
296         super.releaseExternalResources();
297     }
298 
299 }