View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.traffic;
17  
18  import java.util.HashMap;
19  import java.util.LinkedList;
20  import java.util.List;
21  import java.util.Map;
22  import java.util.concurrent.TimeUnit;
23  
24  import org.jboss.netty.channel.ChannelHandler.Sharable;
25  import org.jboss.netty.channel.ChannelHandlerContext;
26  import org.jboss.netty.channel.ChannelStateEvent;
27  import org.jboss.netty.channel.MessageEvent;
28  import org.jboss.netty.handler.execution.ExecutionHandler;
29  import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
30  import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
31  import org.jboss.netty.util.ObjectSizeEstimator;
32  import org.jboss.netty.util.Timeout;
33  import org.jboss.netty.util.Timer;
34  import org.jboss.netty.util.TimerTask;
35  
36  
37  /**
38   * This implementation of the {@link AbstractTrafficShapingHandler} is for global
39   * traffic shaping, that is to say a global limitation of the bandwidth, whatever
40   * the number of opened channels.<br><br>
41   *
42   * The general use should be as follow:<br>
43   * <ul>
44   * <li>Create your unique GlobalTrafficShapingHandler like:<br><br>
45   * <tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(timer);</tt><br><br>
46   * timer could be created using <tt>HashedWheelTimer</tt><br>
47   * <tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
48   *
49   * <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
50   * and shared among all channels as the counter must be shared among all channels.</b><br><br>
51   *
52   * Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
53   * or the check interval (in millisecond) that represents the delay between two computations of the
54   * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br><br>
55   *
56   * A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
57   * it is recommended to set a positive value, even if it is high since the precision of the
58   * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
59   * the less precise the traffic shaping will be. It is suggested as higher value something close
60   * to 5 or 10 minutes.<br><br>
61   *
62   * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br>
63   * </li>
64   * <li>Add it in your pipeline, before a recommended {@link ExecutionHandler} (like
65   * {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).<br>
66   * <tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
67   * </li>
68   * <li>When you shutdown your application, release all the external resources
69   * by calling:<br>
70   * <tt>myHandler.releaseExternalResources();</tt><br>
71   * </li>
72   * </ul><br>
73   */
74  @Sharable
75  public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
76      private Map<Integer, List<ToSend>> messagesQueues = new HashMap<Integer, List<ToSend>>();
77  
78      /**
79       * Create the global TrafficCounter
80       */
81      void createGlobalTrafficCounter() {
82          TrafficCounter tc;
83          if (timer != null) {
84              tc = new TrafficCounter(this, timer, "GlobalTC",
85                      checkInterval);
86              setTrafficCounter(tc);
87              tc.start();
88          }
89      }
90  
91      public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
92              long readLimit, long checkInterval) {
93          super(timer, writeLimit, readLimit, checkInterval);
94          createGlobalTrafficCounter();
95      }
96  
97      public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
98              long readLimit, long checkInterval, long maxTime) {
99          super(timer, writeLimit, readLimit, checkInterval, maxTime);
100         createGlobalTrafficCounter();
101     }
102 
103     public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
104             long readLimit) {
105         super(timer, writeLimit, readLimit);
106         createGlobalTrafficCounter();
107     }
108 
109     public GlobalTrafficShapingHandler(Timer timer, long checkInterval) {
110         super(timer, checkInterval);
111         createGlobalTrafficCounter();
112     }
113 
114     public GlobalTrafficShapingHandler(Timer timer) {
115         super(timer);
116         createGlobalTrafficCounter();
117     }
118 
119     public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
120             Timer timer, long writeLimit, long readLimit,
121             long checkInterval) {
122         super(objectSizeEstimator, timer, writeLimit, readLimit,
123                 checkInterval);
124         createGlobalTrafficCounter();
125     }
126 
127     public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
128             Timer timer, long writeLimit, long readLimit,
129             long checkInterval, long maxTime) {
130         super(objectSizeEstimator, timer, writeLimit, readLimit,
131                 checkInterval, maxTime);
132         createGlobalTrafficCounter();
133     }
134 
135     public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
136             Timer timer, long writeLimit, long readLimit) {
137         super(objectSizeEstimator, timer, writeLimit, readLimit);
138         createGlobalTrafficCounter();
139     }
140 
141     public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
142             Timer timer, long checkInterval) {
143         super(objectSizeEstimator, timer, checkInterval);
144         createGlobalTrafficCounter();
145     }
146 
147     public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
148             Timer timer) {
149         super(objectSizeEstimator, timer);
150         createGlobalTrafficCounter();
151     }
152 
153     private static final class ToSend {
154         final long date;
155         final MessageEvent toSend;
156 
157         private ToSend(final long delay, final MessageEvent toSend) {
158             this.date = System.currentTimeMillis() + delay;
159             this.toSend = toSend;
160         }
161     }
162 
163     @Override
164     protected synchronized void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long delay)
165             throws Exception {
166         Integer key = ctx.getChannel().getId();
167         List<ToSend> messagesQueue = messagesQueues.get(key);
168         if (delay == 0 && (messagesQueue == null || messagesQueue.isEmpty())) {
169             internalSubmitWrite(ctx, evt);
170             return;
171         }
172         if (timer == null) {
173             // Sleep since no executor
174             Thread.sleep(delay);
175             internalSubmitWrite(ctx, evt);
176             return;
177         }
178         if (messagesQueue == null) {
179             messagesQueue = new LinkedList<ToSend>();
180             messagesQueues.put(key, messagesQueue);
181         }
182         final ToSend newToSend = new ToSend(delay, evt);
183         messagesQueue.add(newToSend);
184         final List<ToSend> mqfinal = messagesQueue;
185         timer.newTimeout(new TimerTask() {
186             public void run(Timeout timeout) throws Exception {
187                 sendAllValid(ctx, mqfinal);
188             }
189         }, delay + 1, TimeUnit.MILLISECONDS);
190     }
191 
192     private synchronized void sendAllValid(ChannelHandlerContext ctx, final List<ToSend> messagesQueue)
193             throws Exception {
194         while (!messagesQueue.isEmpty()) {
195             ToSend newToSend = messagesQueue.remove(0);
196             if (newToSend.date <= System.currentTimeMillis()) {
197                 internalSubmitWrite(ctx, newToSend.toSend);
198             } else {
199                 messagesQueue.add(0, newToSend);
200                 break;
201             }
202         }
203     }
204 
205     @Override
206     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
207             throws Exception {
208         Integer key = ctx.getChannel().getId();
209         List<ToSend> messagesQueue = new LinkedList<ToSend>();
210         messagesQueues.put(key, messagesQueue);
211         super.channelConnected(ctx, e);
212     }
213 
214     @Override
215     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
216             throws Exception {
217         Integer key = ctx.getChannel().hashCode();
218         List<ToSend> mq = messagesQueues.remove(key);
219         if (mq != null) {
220             mq.clear();
221         }
222         super.channelClosed(ctx, e);
223     }
224 }