View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.traffic;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelEvent;
20  import org.jboss.netty.channel.ChannelHandlerContext;
21  import org.jboss.netty.channel.ChannelState;
22  import org.jboss.netty.channel.ChannelStateEvent;
23  import org.jboss.netty.channel.MessageEvent;
24  import org.jboss.netty.channel.SimpleChannelHandler;
25  import org.jboss.netty.logging.InternalLogger;
26  import org.jboss.netty.logging.InternalLoggerFactory;
27  import org.jboss.netty.util.DefaultObjectSizeEstimator;
28  import org.jboss.netty.util.ExternalResourceReleasable;
29  import org.jboss.netty.util.ObjectSizeEstimator;
30  import org.jboss.netty.util.Timeout;
31  import org.jboss.netty.util.Timer;
32  import org.jboss.netty.util.TimerTask;
33  
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  
37  /**
38   * AbstractTrafficShapingHandler allows to limit the global bandwidth
39   * (see {@link GlobalTrafficShapingHandler}) or per session
40   * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
41   * It allows too to implement an almost real time monitoring of the bandwidth using
42   * the monitors from {@link TrafficCounter} that will call back every checkInterval
43   * the method doAccounting of this handler.<br>
44   * <br>
45   *
46   * An {@link ObjectSizeEstimator} can be passed at construction to specify what
47   * is the size of the object to be read or write accordingly to the type of
48   * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
49   *
50   * If you want for any particular reasons to stop the monitoring (accounting) or to change
51   * the read/write limit or the check interval, several methods allow that for you:<br>
52   * <ul>
53   * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
54   * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
55   * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
56   * <li></li>
57   * </ul>
58   */
59  public abstract class AbstractTrafficShapingHandler extends
60          SimpleChannelHandler implements ExternalResourceReleasable {
61      /**
62       * Internal logger
63       */
64      static InternalLogger logger = InternalLoggerFactory
65              .getInstance(AbstractTrafficShapingHandler.class);
66  
67      /**
68       * Default delay between two checks: 1s
69       */
70      public static final long DEFAULT_CHECK_INTERVAL = 1000;
71  
72      /**
73       * Default minimal time to wait
74       */
75      private static final long MINIMAL_WAIT = 10;
76  
77      /**
78       * Traffic Counter
79       */
80      protected TrafficCounter trafficCounter;
81  
82      /**
83       * ObjectSizeEstimator
84       */
85      private ObjectSizeEstimator objectSizeEstimator;
86  
87      /**
88       * Timer to associated to any TrafficCounter
89       */
90      protected Timer timer;
91  
92      /**
93       * used in releaseExternalResources() to cancel the timer
94       */
95      private volatile Timeout timeout;
96  
97      /**
98       * Limit in B/s to apply to write
99       */
100     private long writeLimit;
101 
102     /**
103      * Limit in B/s to apply to read
104      */
105     private long readLimit;
106 
107     /**
108      * Delay between two performance snapshots
109      */
110     protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
111 
112     /**
113      * Boolean associated with the release of this TrafficShapingHandler.
114      * It will be true only once when the releaseExternalRessources is called
115      * to prevent waiting when shutdown.
116      */
117     final AtomicBoolean release = new AtomicBoolean(false);
118 
119      private void init(ObjectSizeEstimator newObjectSizeEstimator,
120              Timer newTimer, long newWriteLimit, long newReadLimit,
121              long newCheckInterval) {
122          objectSizeEstimator = newObjectSizeEstimator;
123          timer = newTimer;
124          writeLimit = newWriteLimit;
125          readLimit = newReadLimit;
126          checkInterval = newCheckInterval;
127          //logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
128      }
129 
130     /**
131      *
132      * @param newTrafficCounter the TrafficCounter to set
133      */
134     void setTrafficCounter(TrafficCounter newTrafficCounter) {
135         trafficCounter = newTrafficCounter;
136     }
137 
138     /**
139      * Constructor using default {@link ObjectSizeEstimator}
140      *
141      * @param timer
142      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
143      * @param writeLimit
144      *          0 or a limit in bytes/s
145      * @param readLimit
146      *          0 or a limit in bytes/s
147      * @param checkInterval
148      *          The delay between two computations of performances for
149      *            channels or 0 if no stats are to be computed
150      */
151     protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
152                                             long readLimit, long checkInterval) {
153         init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval);
154     }
155 
156     /**
157      * Constructor using the specified ObjectSizeEstimator
158      *
159      * @param objectSizeEstimator
160      *            the {@link ObjectSizeEstimator} that will be used to compute
161      *            the size of the message
162      * @param timer
163      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
164      * @param writeLimit
165      *          0 or a limit in bytes/s
166      * @param readLimit
167      *          0 or a limit in bytes/s
168      * @param checkInterval
169      *          The delay between two computations of performances for
170      *            channels or 0 if no stats are to be computed
171      */
172     protected AbstractTrafficShapingHandler(
173             ObjectSizeEstimator objectSizeEstimator, Timer timer,
174             long writeLimit, long readLimit, long checkInterval) {
175         init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval);
176     }
177 
178     /**
179      * Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
180      *
181      * @param timer
182      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
183      * @param writeLimit
184      *          0 or a limit in bytes/s
185      * @param readLimit
186      *          0 or a limit in bytes/s
187      */
188     protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
189                                             long readLimit) {
190         init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
191     }
192 
193     /**
194      * Constructor using the specified ObjectSizeEstimator and using default Check Interval
195      *
196      * @param objectSizeEstimator
197      *            the {@link ObjectSizeEstimator} that will be used to compute
198      *            the size of the message
199      * @param timer
200      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
201      * @param writeLimit
202      *          0 or a limit in bytes/s
203      * @param readLimit
204      *          0 or a limit in bytes/s
205      */
206     protected AbstractTrafficShapingHandler(
207             ObjectSizeEstimator objectSizeEstimator, Timer timer,
208             long writeLimit, long readLimit) {
209         init(objectSizeEstimator, timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
210     }
211 
212     /**
213      * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
214      *
215      * @param timer
216      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
217      */
218     protected AbstractTrafficShapingHandler(Timer timer) {
219         init(new DefaultObjectSizeEstimator(), timer, 0, 0, DEFAULT_CHECK_INTERVAL);
220     }
221 
222     /**
223      * Constructor using the specified ObjectSizeEstimator and using NO LIMIT and default Check Interval
224      *
225      * @param objectSizeEstimator
226      *            the {@link ObjectSizeEstimator} that will be used to compute
227      *            the size of the message
228      * @param timer
229      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
230      */
231     protected AbstractTrafficShapingHandler(
232             ObjectSizeEstimator objectSizeEstimator, Timer timer) {
233         init(objectSizeEstimator, timer, 0, 0, DEFAULT_CHECK_INTERVAL);
234     }
235 
236     /**
237      * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
238      *
239      * @param timer
240      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
241      * @param checkInterval
242      *          The delay between two computations of performances for
243      *            channels or 0 if no stats are to be computed
244      */
245     protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
246         init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval);
247     }
248 
249     /**
250      * Constructor using the specified ObjectSizeEstimator and using NO LIMIT
251      *
252      * @param objectSizeEstimator
253      *            the {@link ObjectSizeEstimator} that will be used to compute
254      *            the size of the message
255      * @param timer
256      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
257      * @param checkInterval
258      *          The delay between two computations of performances for
259      *            channels or 0 if no stats are to be computed
260      */
261     protected AbstractTrafficShapingHandler(
262             ObjectSizeEstimator objectSizeEstimator, Timer timer,
263             long checkInterval) {
264         init(objectSizeEstimator, timer, 0, 0, checkInterval);
265     }
266 
267     /**
268      * Change the underlying limitations and check interval.
269      */
270     public void configure(long newWriteLimit, long newReadLimit,
271             long newCheckInterval) {
272         configure(newWriteLimit, newReadLimit);
273         configure(newCheckInterval);
274     }
275 
276     /**
277      * Change the underlying limitations.
278      */
279     public void configure(long newWriteLimit, long newReadLimit) {
280         writeLimit = newWriteLimit;
281         readLimit = newReadLimit;
282         if (trafficCounter != null) {
283             trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
284         }
285     }
286 
287     /**
288      * Change the check interval.
289      */
290     public void configure(long newCheckInterval) {
291         checkInterval = newCheckInterval;
292         if (trafficCounter != null) {
293             trafficCounter.configure(checkInterval);
294         }
295     }
296 
297     /**
298      * Called each time the accounting is computed from the TrafficCounters.
299      * This method could be used for instance to implement almost real time accounting.
300      *
301      * @param counter
302      *            the TrafficCounter that computes its performance
303      */
304     protected void doAccounting(TrafficCounter counter) {
305         // NOOP by default
306     }
307 
308     /**
309      * Class to implement setReadable at fix time
310      */
311     private class ReopenReadTimerTask implements TimerTask {
312         final ChannelHandlerContext ctx;
313         ReopenReadTimerTask(ChannelHandlerContext ctx) {
314             this.ctx = ctx;
315         }
316         public void run(Timeout timeoutArg) throws Exception {
317             //logger.warn("Start RRTT: "+release.get());
318             if (release.get()) {
319                 return;
320             }
321             /*
322             logger.warn("WAKEUP! "+
323                     (ctx != null && ctx.getChannel() != null &&
324                             ctx.getChannel().isConnected()));
325              */
326             if (ctx != null && ctx.getChannel() != null &&
327                     ctx.getChannel().isConnected()) {
328                 //logger.warn(" setReadable TRUE: ");
329                 // readSuspended = false;
330                 ctx.setAttachment(null);
331                 ctx.getChannel().setReadable(true);
332             }
333         }
334     }
335 
336     /**
337      * @return the time that should be necessary to wait to respect limit. Can be negative time
338      */
339     private static long getTimeToWait(long limit, long bytes, long lastTime, long curtime) {
340         long interval = curtime - lastTime;
341         if (interval <= 0) {
342             // Time is too short, so just lets continue
343             return 0;
344         }
345         return (bytes * 1000 / limit - interval) / 10 * 10;
346     }
347 
348     @Override
349     public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
350             throws Exception {
351         try {
352             long curtime = System.currentTimeMillis();
353             long size = objectSizeEstimator.estimateSize(evt.getMessage());
354             if (trafficCounter != null) {
355                 trafficCounter.bytesRecvFlowControl(size);
356                 if (readLimit == 0) {
357                     // no action
358                     return;
359                 }
360                 // compute the number of ms to wait before reopening the channel
361                 long wait = getTimeToWait(readLimit,
362                         trafficCounter.getCurrentReadBytes(),
363                         trafficCounter.getLastTime(), curtime);
364                 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
365                                             // time in order to
366                     Channel channel = ctx.getChannel();
367                     // try to limit the traffic
368                     if (channel != null && channel.isConnected()) {
369                         // Channel version
370                         if (timer == null) {
371                             // Sleep since no executor
372                             // logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
373                             if (release.get()) {
374                                 return;
375                             }
376                             Thread.sleep(wait);
377                             return;
378                         }
379                         if (ctx.getAttachment() == null) {
380                             // readSuspended = true;
381                             ctx.setAttachment(Boolean.TRUE);
382                             channel.setReadable(false);
383                             // logger.warn("Read will wakeup after "+wait+" ms "+this);
384                             TimerTask timerTask = new ReopenReadTimerTask(ctx);
385                             timeout = timer.newTimeout(timerTask, wait,
386                                     TimeUnit.MILLISECONDS);
387                         } else {
388                             // should be waiting: but can occurs sometime so as
389                             // a FIX
390                             // logger.warn("Read sleep ok but should not be here: "+wait+" "+this);
391                             if (release.get()) {
392                                 return;
393                             }
394                             Thread.sleep(wait);
395                         }
396                     } else {
397                         // Not connected or no channel
398                         // logger.warn("Read sleep "+wait+" ms for "+this);
399                         if (release.get()) {
400                             return;
401                         }
402                         Thread.sleep(wait);
403                     }
404                 }
405             }
406         } finally {
407             // The message is then just passed to the next handler
408             super.messageReceived(ctx, evt);
409         }
410     }
411 
412     @Override
413     public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
414             throws Exception {
415         try {
416             long curtime = System.currentTimeMillis();
417             long size = objectSizeEstimator.estimateSize(evt.getMessage());
418             if (trafficCounter != null) {
419                 trafficCounter.bytesWriteFlowControl(size);
420                 if (writeLimit == 0) {
421                     return;
422                 }
423                 // compute the number of ms to wait before continue with the
424                 // channel
425                 long wait = getTimeToWait(writeLimit,
426                         trafficCounter.getCurrentWrittenBytes(),
427                         trafficCounter.getLastTime(), curtime);
428                 if (wait >= MINIMAL_WAIT) {
429                     // Global or Channel
430                     if (release.get()) {
431                         return;
432                     }
433                     Thread.sleep(wait);
434                 }
435             }
436         } finally {
437             // The message is then just passed to the next handler
438             super.writeRequested(ctx, evt);
439         }
440     }
441     @Override
442     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
443             throws Exception {
444         if (e instanceof ChannelStateEvent) {
445             ChannelStateEvent cse = (ChannelStateEvent) e;
446             if (cse.getState() == ChannelState.INTEREST_OPS &&
447                     (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
448 
449                 // setReadable(true) requested
450                 boolean readSuspended = ctx.getAttachment() != null;
451                 if (readSuspended) {
452                     // Drop the request silently if this handler has
453                     // set the flag.
454                     e.getFuture().setSuccess();
455                     return;
456                 }
457             }
458         }
459         super.handleDownstream(ctx, e);
460     }
461 
462     /**
463      *
464      * @return the current TrafficCounter (if
465      *         channel is still connected)
466      */
467     public TrafficCounter getTrafficCounter() {
468         return trafficCounter;
469     }
470 
471     public void releaseExternalResources() {
472         if (trafficCounter != null) {
473             trafficCounter.stop();
474         }
475         release.set(true);
476         if (timeout != null) {
477             timeout.cancel();
478         }
479         timer.stop();
480     }
481 
482     @Override
483     public String toString() {
484         return "TrafficShaping with Write Limit: " + writeLimit +
485                 " Read Limit: " + readLimit + " and Counter: " +
486                 (trafficCounter != null? trafficCounter.toString() : "none");
487     }
488 }