View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.traffic;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelHandler.Sharable;
20  import org.jboss.netty.channel.ChannelHandlerContext;
21  import org.jboss.netty.channel.ChannelStateEvent;
22  import org.jboss.netty.channel.MessageEvent;
23  import org.jboss.netty.handler.execution.ExecutionHandler;
24  import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
25  import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
26  import org.jboss.netty.util.ObjectSizeEstimator;
27  import org.jboss.netty.util.Timeout;
28  import org.jboss.netty.util.Timer;
29  import org.jboss.netty.util.TimerTask;
30  
31  import java.util.LinkedList;
32  import java.util.List;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicLong;
37  
38  
39  /**
40   * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global
41   * traffic shaping, that is to say a global limitation of the bandwidth, whatever
42   * the number of opened channels.</p>
43   *
44   * The general use should be as follow:
45   * <ul>
46   * <li><p>Create your unique GlobalTrafficShapingHandler like:</p>
47   * <p><tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(timer);</tt></p>
48   * <p>timer could be created using <tt>HashedWheelTimer</tt></p>
49   * <p><tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt></p>
50   *
51   * <p><b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
52   * and shared among all channels as the counter must be shared among all channels.</b></p>
53   *
54   * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
55   * or the check interval (in millisecond) that represents the delay between two computations of the
56   * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
57   *
58   * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
59   * it is recommended to set a positive value, even if it is high since the precision of the
60   * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
61   * the less precise the traffic shaping will be. It is suggested as higher value something close
62   * to 5 or 10 minutes.</p>
63   *
64   * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
65   * </li>
66   * <li><p>Add it in your pipeline, before a recommended {@link ExecutionHandler} (like
67   * {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).</p>
68   * <p><tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt></p>
69   * </li>
70   * <li><p>When you shutdown your application, release all the external resources
71   * by calling:</p>
72   * <tt>myHandler.releaseExternalResources();</tt>
73   * </li>
74   * </ul>
75   */
76  @Sharable
77  public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
78      private final ConcurrentMap<Integer, PerChannel> channelQueues = new ConcurrentHashMap<Integer, PerChannel>();
79  
80      /**
81       * Global queues size
82       */
83      private final AtomicLong queuesSize = new AtomicLong();
84  
85      /**
86       * Max size in the list before proposing to stop writing new objects from next handlers
87       * for all channel (global)
88       */
89      long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
90  
91      private static final class PerChannel {
92          List<ToSend> messagesQueue;
93          ChannelHandlerContext ctx;
94          long queueSize;
95          long lastWriteTimestamp;
96          long lastReadTimestamp;
97      }
98      /**
99       * Create the global TrafficCounter.
100      */
101     void createGlobalTrafficCounter() {
102         TrafficCounter tc;
103         if (timer != null) {
104             tc = new TrafficCounter(this, timer, "GlobalTC",
105                     checkInterval);
106             setTrafficCounter(tc);
107             tc.start();
108         }
109     }
110 
111     public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
112             long readLimit, long checkInterval) {
113         super(timer, writeLimit, readLimit, checkInterval);
114         createGlobalTrafficCounter();
115     }
116 
117     public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
118             long readLimit, long checkInterval, long maxTime) {
119         super(timer, writeLimit, readLimit, checkInterval, maxTime);
120         createGlobalTrafficCounter();
121     }
122 
123     public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
124             long readLimit) {
125         super(timer, writeLimit, readLimit);
126         createGlobalTrafficCounter();
127     }
128 
129     public GlobalTrafficShapingHandler(Timer timer, long checkInterval) {
130         super(timer, checkInterval);
131         createGlobalTrafficCounter();
132     }
133 
134     public GlobalTrafficShapingHandler(Timer timer) {
135         super(timer);
136         createGlobalTrafficCounter();
137     }
138 
139     public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
140             Timer timer, long writeLimit, long readLimit,
141             long checkInterval) {
142         super(objectSizeEstimator, timer, writeLimit, readLimit,
143                 checkInterval);
144         createGlobalTrafficCounter();
145     }
146 
147     public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
148             Timer timer, long writeLimit, long readLimit,
149             long checkInterval, long maxTime) {
150         super(objectSizeEstimator, timer, writeLimit, readLimit,
151                 checkInterval, maxTime);
152         createGlobalTrafficCounter();
153     }
154 
155     public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
156             Timer timer, long writeLimit, long readLimit) {
157         super(objectSizeEstimator, timer, writeLimit, readLimit);
158         createGlobalTrafficCounter();
159     }
160 
161     public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
162             Timer timer, long checkInterval) {
163         super(objectSizeEstimator, timer, checkInterval);
164         createGlobalTrafficCounter();
165     }
166 
167     public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
168             Timer timer) {
169         super(objectSizeEstimator, timer);
170         createGlobalTrafficCounter();
171     }
172 
173     /**
174      * @return the maxGlobalWriteSize default value being 400 MB.
175      */
176     public long getMaxGlobalWriteSize() {
177         return maxGlobalWriteSize;
178     }
179 
180     /**
181      * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
182      *            globally for all channels before write suspended is set,
183      *            default value being 400 MB.
184      */
185     public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
186         this.maxGlobalWriteSize = maxGlobalWriteSize;
187     }
188 
189     /**
190      * @return the global size of the buffers for all queues.
191      */
192     public long queuesSize() {
193         return queuesSize.get();
194     }
195 
196     private synchronized PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
197         Integer key = ctx.getChannel().hashCode();
198         PerChannel perChannel = channelQueues.get(key);
199         if (perChannel == null) {
200             perChannel = new PerChannel();
201             perChannel.messagesQueue = new LinkedList<ToSend>();
202             perChannel.ctx = ctx;
203             perChannel.queueSize = 0L;
204             perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
205             perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
206             channelQueues.put(key, perChannel);
207         }
208         return perChannel;
209     }
210 
211     private static final class ToSend {
212         final long relativeTimeAction;
213         final MessageEvent toSend;
214         final long size;
215 
216         private ToSend(final long delay, final MessageEvent toSend, final long size) {
217             relativeTimeAction = delay;
218             this.toSend = toSend;
219             this.size = size;
220         }
221     }
222 
223     @Override
224     long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
225         Integer key = ctx.getChannel().hashCode();
226         PerChannel perChannel = channelQueues.get(key);
227         if (perChannel != null) {
228             if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
229                 wait = maxTime;
230             }
231         }
232         return wait;
233     }
234     @Override
235     void informReadOperation(final ChannelHandlerContext ctx, final long now) {
236         Integer key = ctx.getChannel().hashCode();
237         PerChannel perChannel = channelQueues.get(key);
238         if (perChannel != null) {
239             perChannel.lastReadTimestamp = now;
240         }
241     }
242 
243     @Override
244     void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt,
245             final long size, final long writedelay, final long now)
246             throws Exception {
247         PerChannel perChannel = getOrSetPerChannel(ctx);
248         long delay;
249         final ToSend newToSend;
250         boolean globalSizeExceeded = false;
251         Channel channel = ctx.getChannel();
252         synchronized (perChannel) {
253             if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
254                 if (! channel.isConnected()) {
255                     // ignore
256                     return;
257                 }
258                 if (trafficCounter != null) {
259                     trafficCounter.bytesRealWriteFlowControl(size);
260                 }
261                 ctx.sendDownstream(evt);
262                 perChannel.lastWriteTimestamp = now;
263                 return;
264             }
265             delay = writedelay;
266             if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
267                 delay = maxTime;
268             }
269             if (timer == null) {
270                 // Sleep since no executor
271                 Thread.sleep(delay);
272                 if (! ctx.getChannel().isConnected()) {
273                     // ignore
274                     return;
275                 }
276                 if (trafficCounter != null) {
277                     trafficCounter.bytesRealWriteFlowControl(size);
278                 }
279                 ctx.sendDownstream(evt);
280                 perChannel.lastWriteTimestamp = now;
281                 return;
282             }
283             if (! ctx.getChannel().isConnected()) {
284                 // ignore
285                 return;
286             }
287             newToSend = new ToSend(delay + now, evt, size);
288             perChannel.messagesQueue.add(newToSend);
289             perChannel.queueSize += size;
290             queuesSize.addAndGet(size);
291             checkWriteSuspend(ctx, delay, perChannel.queueSize);
292             if (queuesSize.get() > maxGlobalWriteSize) {
293                 globalSizeExceeded = true;
294             }
295         }
296         if (globalSizeExceeded) {
297             setWritable(ctx, false);
298         }
299         final long futureNow = newToSend.relativeTimeAction;
300         final PerChannel forSchedule = perChannel;
301         timer.newTimeout(new TimerTask() {
302             public void run(Timeout timeout) throws Exception {
303                 sendAllValid(ctx, forSchedule, futureNow);
304             }
305         }, delay, TimeUnit.MILLISECONDS);
306     }
307 
308     private void sendAllValid(ChannelHandlerContext ctx, final PerChannel perChannel, final long now)
309             throws Exception {
310         Channel channel = ctx.getChannel();
311         if (! channel.isConnected()) {
312             // ignore
313             return;
314         }
315         synchronized (perChannel) {
316             while (!perChannel.messagesQueue.isEmpty()) {
317                 ToSend newToSend = perChannel.messagesQueue.remove(0);
318                 if (newToSend.relativeTimeAction <= now) {
319                     if (! channel.isConnected()) {
320                         // ignore
321                         break;
322                     }
323                     long size = newToSend.size;
324                     if (trafficCounter != null) {
325                         trafficCounter.bytesRealWriteFlowControl(size);
326                     }
327                     perChannel.queueSize -= size;
328                     queuesSize.addAndGet(-size);
329                     ctx.sendDownstream(newToSend.toSend);
330                     perChannel.lastWriteTimestamp = now;
331                 } else {
332                     perChannel.messagesQueue.add(0, newToSend);
333                     break;
334                 }
335             }
336             if (perChannel.messagesQueue.isEmpty()) {
337                 releaseWriteSuspended(ctx);
338             }
339         }
340     }
341 
342     @Override
343     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
344             throws Exception {
345         getOrSetPerChannel(ctx);
346         super.channelConnected(ctx, e);
347     }
348 
349     @Override
350     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
351             throws Exception {
352         Integer key = ctx.getChannel().hashCode();
353         PerChannel perChannel = channelQueues.remove(key);
354         if (perChannel != null) {
355             synchronized (perChannel) {
356                 queuesSize.addAndGet(-perChannel.queueSize);
357                 perChannel.messagesQueue.clear();
358             }
359         }
360         super.channelClosed(ctx, e);
361     }
362 
363     @Override
364     public void releaseExternalResources() {
365         for (PerChannel perChannel : channelQueues.values()) {
366             if (perChannel != null && perChannel.ctx != null && perChannel.ctx.getChannel().isConnected()) {
367                 Channel channel = perChannel.ctx.getChannel();
368                 synchronized (perChannel) {
369                     for (ToSend toSend : perChannel.messagesQueue) {
370                         if (! channel.isConnected()) {
371                             // ignore
372                             break;
373                         }
374                         perChannel.ctx.sendDownstream(toSend.toSend);
375                     }
376                     perChannel.messagesQueue.clear();
377                 }
378             }
379         }
380         channelQueues.clear();
381         queuesSize.set(0);
382         super.releaseExternalResources();
383     }
384 }