View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.traffic;
17  
18  import java.util.concurrent.TimeUnit;
19  import java.util.concurrent.atomic.AtomicBoolean;
20  
21  import org.jboss.netty.channel.Channel;
22  import org.jboss.netty.channel.ChannelEvent;
23  import org.jboss.netty.channel.ChannelHandlerContext;
24  import org.jboss.netty.channel.ChannelState;
25  import org.jboss.netty.channel.ChannelStateEvent;
26  import org.jboss.netty.channel.MessageEvent;
27  import org.jboss.netty.channel.SimpleChannelHandler;
28  import org.jboss.netty.logging.InternalLogger;
29  import org.jboss.netty.logging.InternalLoggerFactory;
30  import org.jboss.netty.util.DefaultObjectSizeEstimator;
31  import org.jboss.netty.util.ExternalResourceReleasable;
32  import org.jboss.netty.util.ObjectSizeEstimator;
33  import org.jboss.netty.util.Timeout;
34  import org.jboss.netty.util.Timer;
35  import org.jboss.netty.util.TimerTask;
36  
37  /**
38   * AbstractTrafficShapingHandler allows to limit the global bandwidth
39   * (see {@link GlobalTrafficShapingHandler}) or per session
40   * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
41   * It allows too to implement an almost real time monitoring of the bandwidth using
42   * the monitors from {@link TrafficCounter} that will call back every checkInterval
43   * the method doAccounting of this handler.<br>
44   * <br>
45   *
46   * An {@link ObjectSizeEstimator} can be passed at construction to specify what
47   * is the size of the object to be read or write accordingly to the type of
48   * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
49   *
50   * If you want for any particular reasons to stop the monitoring (accounting) or to change
51   * the read/write limit or the check interval, several methods allow that for you:<br>
52   * <ul>
53   * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
54   * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
55   * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
56   * <li></li>
57   * </ul>
58   */
59  public abstract class AbstractTrafficShapingHandler extends
60          SimpleChannelHandler implements ExternalResourceReleasable {
61      /**
62       * Internal logger
63       */
64      static InternalLogger logger = InternalLoggerFactory
65              .getInstance(AbstractTrafficShapingHandler.class);
66  
67      /**
68       * Default delay between two checks: 1s
69       */
70      public static final long DEFAULT_CHECK_INTERVAL = 1000;
71  
72      /**
73       * Default minimal time to wait
74       */
75      private static final long MINIMAL_WAIT = 10;
76  
77      /**
78       * Traffic Counter
79       */
80      protected TrafficCounter trafficCounter;
81  
82      /**
83       * ObjectSizeEstimator
84       */
85      private ObjectSizeEstimator objectSizeEstimator;
86  
87      /**
88       * Timer to associated to any TrafficCounter
89       */
90      protected Timer timer;
91  
92      /**
93       * used in releaseExternalResources() to cancel the timer
94       */
95      private volatile Timeout timeout;
96  
97      /**
98       * Limit in B/s to apply to write
99       */
100     private long writeLimit;
101 
102     /**
103      * Limit in B/s to apply to read
104      */
105     private long readLimit;
106 
107     /**
108      * Delay between two performance snapshots
109      */
110     protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
111 
112     /**
113      * Boolean associated with the release of this TrafficShapingHandler.
114      * It will be true only once when the releaseExternalRessources is called
115      * to prevent waiting when shutdown.
116      */
117     final AtomicBoolean release = new AtomicBoolean(false);
118 
119      private void init(ObjectSizeEstimator newObjectSizeEstimator,
120              Timer newTimer, long newWriteLimit, long newReadLimit,
121              long newCheckInterval) {
122          objectSizeEstimator = newObjectSizeEstimator;
123          timer = newTimer;
124          writeLimit = newWriteLimit;
125          readLimit = newReadLimit;
126          checkInterval = newCheckInterval;
127          //logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
128      }
129 
130     /**
131      *
132      * @param newTrafficCounter the TrafficCounter to set
133      */
134     void setTrafficCounter(TrafficCounter newTrafficCounter) {
135         trafficCounter = newTrafficCounter;
136     }
137 
138     /**
139      * Constructor using default {@link ObjectSizeEstimator}
140      *
141      * @param timer
142      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
143      * @param writeLimit
144      *          0 or a limit in bytes/s
145      * @param readLimit
146      *          0 or a limit in bytes/s
147      * @param checkInterval
148      *          The delay between two computations of performances for
149      *            channels or 0 if no stats are to be computed
150      */
151     protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
152                                             long readLimit, long checkInterval) {
153         init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval);
154     }
155 
156     /**
157      * Constructor using the specified ObjectSizeEstimator
158      *
159      * @param objectSizeEstimator
160      *            the {@link ObjectSizeEstimator} that will be used to compute
161      *            the size of the message
162      * @param timer
163      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
164      * @param writeLimit
165      *          0 or a limit in bytes/s
166      * @param readLimit
167      *          0 or a limit in bytes/s
168      * @param checkInterval
169      *          The delay between two computations of performances for
170      *            channels or 0 if no stats are to be computed
171      */
172     protected AbstractTrafficShapingHandler(
173             ObjectSizeEstimator objectSizeEstimator, Timer timer,
174             long writeLimit, long readLimit, long checkInterval) {
175         init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval);
176     }
177 
178     /**
179      * Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
180      *
181      * @param timer
182      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
183      * @param writeLimit
184      *          0 or a limit in bytes/s
185      * @param readLimit
186      *          0 or a limit in bytes/s
187      */
188     protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
189                                             long readLimit) {
190         init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
191     }
192 
193     /**
194      * Constructor using the specified ObjectSizeEstimator and using default Check Interval
195      *
196      * @param objectSizeEstimator
197      *            the {@link ObjectSizeEstimator} that will be used to compute
198      *            the size of the message
199      * @param timer
200      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
201      * @param writeLimit
202      *          0 or a limit in bytes/s
203      * @param readLimit
204      *          0 or a limit in bytes/s
205      */
206     protected AbstractTrafficShapingHandler(
207             ObjectSizeEstimator objectSizeEstimator, Timer timer,
208             long writeLimit, long readLimit) {
209         init(objectSizeEstimator, timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
210     }
211 
212     /**
213      * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
214      *
215      * @param timer
216      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
217      */
218     protected AbstractTrafficShapingHandler(Timer timer) {
219         init(new DefaultObjectSizeEstimator(), timer, 0, 0, DEFAULT_CHECK_INTERVAL);
220     }
221 
222     /**
223      * Constructor using the specified ObjectSizeEstimator and using NO LIMIT and default Check Interval
224      *
225      * @param objectSizeEstimator
226      *            the {@link ObjectSizeEstimator} that will be used to compute
227      *            the size of the message
228      * @param timer
229      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
230      */
231     protected AbstractTrafficShapingHandler(
232             ObjectSizeEstimator objectSizeEstimator, Timer timer) {
233         init(objectSizeEstimator, timer, 0, 0, DEFAULT_CHECK_INTERVAL);
234     }
235 
236     /**
237      * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
238      *
239      * @param timer
240      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
241      * @param checkInterval
242      *          The delay between two computations of performances for
243      *            channels or 0 if no stats are to be computed
244      */
245     protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
246         init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval);
247     }
248 
249     /**
250      * Constructor using the specified ObjectSizeEstimator and using NO LIMIT
251      *
252      * @param objectSizeEstimator
253      *            the {@link ObjectSizeEstimator} that will be used to compute
254      *            the size of the message
255      * @param timer
256      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
257      * @param checkInterval
258      *          The delay between two computations of performances for
259      *            channels or 0 if no stats are to be computed
260      */
261     protected AbstractTrafficShapingHandler(
262             ObjectSizeEstimator objectSizeEstimator, Timer timer,
263             long checkInterval) {
264         init(objectSizeEstimator, timer, 0, 0, checkInterval);
265     }
266 
267     /**
268      * Change the underlying limitations and check interval.
269      *
270      * @param newWriteLimit
271      * @param newReadLimit
272      * @param newCheckInterval
273      */
274     public void configure(long newWriteLimit, long newReadLimit,
275             long newCheckInterval) {
276         configure(newWriteLimit, newReadLimit);
277         configure(newCheckInterval);
278     }
279 
280     /**
281      * Change the underlying limitations.
282      *
283      * @param newWriteLimit
284      * @param newReadLimit
285      */
286     public void configure(long newWriteLimit, long newReadLimit) {
287         writeLimit = newWriteLimit;
288         readLimit = newReadLimit;
289         if (trafficCounter != null) {
290             trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
291         }
292     }
293 
294     /**
295      * Change the check interval.
296      *
297      * @param newCheckInterval
298      */
299     public void configure(long newCheckInterval) {
300         checkInterval = newCheckInterval;
301         if (trafficCounter != null) {
302             trafficCounter.configure(checkInterval);
303         }
304     }
305 
306     /**
307      * Called each time the accounting is computed from the TrafficCounters.
308      * This method could be used for instance to implement almost real time accounting.
309      *
310      * @param counter
311      *            the TrafficCounter that computes its performance
312      */
313     protected void doAccounting(TrafficCounter counter) {
314         // NOOP by default
315     }
316 
317     /**
318      * Class to implement setReadable at fix time
319      */
320     private class ReopenReadTimerTask implements TimerTask {
321         ChannelHandlerContext ctx;
322         ReopenReadTimerTask(ChannelHandlerContext ctx) {
323             this.ctx = ctx;
324         }
325         public void run(Timeout timeoutArg) throws Exception {
326             //logger.warn("Start RRTT: "+release.get());
327             if (release.get()) {
328                 return;
329             }
330             /*
331             logger.warn("WAKEUP! "+
332                     (ctx != null && ctx.getChannel() != null &&
333                             ctx.getChannel().isConnected()));
334              */
335             if (ctx != null && ctx.getChannel() != null &&
336                     ctx.getChannel().isConnected()) {
337                 //logger.warn(" setReadable TRUE: ");
338                 // readSuspended = false;
339                 ctx.setAttachment(null);
340                 ctx.getChannel().setReadable(true);
341             }
342         }
343     }
344 
345     /**
346     *
347     * @return the time that should be necessary to wait to respect limit. Can
348     *         be negative time
349     */
350     private static long getTimeToWait(long limit, long bytes, long lastTime,
351             long curtime) {
352         long interval = curtime - lastTime;
353         if (interval == 0) {
354             // Time is too short, so just lets continue
355             return 0;
356         }
357         return (bytes * 1000 / limit - interval) / 10 * 10;
358     }
359 
360     @Override
361     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
362             throws Exception {
363         try {
364             long curtime = System.currentTimeMillis();
365             long size = objectSizeEstimator.estimateSize(e.getMessage());
366             if (trafficCounter != null) {
367                 trafficCounter.bytesRecvFlowControl(ctx, size);
368                 if (readLimit == 0) {
369                     // no action
370                     return;
371                 }
372                 // compute the number of ms to wait before reopening the channel
373                 long wait = getTimeToWait(readLimit,
374                         trafficCounter.getCurrentReadBytes(),
375                         trafficCounter.getLastTime(), curtime);
376                 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
377                                             // time in order to
378                     Channel channel = ctx.getChannel();
379                     // try to limit the traffic
380                     if (channel != null && channel.isConnected()) {
381                         // Channel version
382                         if (timer == null) {
383                             // Sleep since no executor
384                             // logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
385                             if (release.get()) {
386                                 return;
387                             }
388                             Thread.sleep(wait);
389                             return;
390                         }
391                         if (ctx.getAttachment() == null) {
392                             // readSuspended = true;
393                             ctx.setAttachment(Boolean.TRUE);
394                             channel.setReadable(false);
395                             // logger.warn("Read will wakeup after "+wait+" ms "+this);
396                             TimerTask timerTask = new ReopenReadTimerTask(ctx);
397                             timeout = timer.newTimeout(timerTask, wait,
398                                     TimeUnit.MILLISECONDS);
399                         } else {
400                             // should be waiting: but can occurs sometime so as
401                             // a FIX
402                             // logger.warn("Read sleep ok but should not be here: "+wait+" "+this);
403                             if (release.get()) {
404                                 return;
405                             }
406                             Thread.sleep(wait);
407                         }
408                     } else {
409                         // Not connected or no channel
410                         // logger.warn("Read sleep "+wait+" ms for "+this);
411                         if (release.get()) {
412                             return;
413                         }
414                         Thread.sleep(wait);
415                     }
416                 }
417             }
418         } finally {
419             // The message is then just passed to the next handler
420             super.messageReceived(ctx, e);
421         }
422     }
423 
424     @Override
425     public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
426             throws Exception {
427         try {
428             long curtime = System.currentTimeMillis();
429             long size = objectSizeEstimator.estimateSize(e.getMessage());
430             if (trafficCounter != null) {
431                 trafficCounter.bytesWriteFlowControl(size);
432                 if (writeLimit == 0) {
433                     return;
434                 }
435                 // compute the number of ms to wait before continue with the
436                 // channel
437                 long wait = getTimeToWait(writeLimit,
438                         trafficCounter.getCurrentWrittenBytes(),
439                         trafficCounter.getLastTime(), curtime);
440                 if (wait >= MINIMAL_WAIT) {
441                     // Global or Channel
442                     if (release.get()) {
443                         return;
444                     }
445                     Thread.sleep(wait);
446                 }
447             }
448         } finally {
449             // The message is then just passed to the next handler
450             super.writeRequested(ctx, e);
451         }
452     }
453     @Override
454     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
455             throws Exception {
456         if (e instanceof ChannelStateEvent) {
457             ChannelStateEvent cse = (ChannelStateEvent) e;
458             if (cse.getState() == ChannelState.INTEREST_OPS &&
459                     (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
460 
461                 // setReadable(true) requested
462                 boolean readSuspended = ctx.getAttachment() != null;
463                 if (readSuspended) {
464                     // Drop the request silently if this handler has
465                     // set the flag.
466                     e.getFuture().setSuccess();
467                     return;
468                 }
469             }
470         }
471         super.handleDownstream(ctx, e);
472     }
473 
474     /**
475      *
476      * @return the current TrafficCounter (if
477      *         channel is still connected)
478      */
479     public TrafficCounter getTrafficCounter() {
480         return trafficCounter;
481     }
482 
483     public void releaseExternalResources() {
484         if (trafficCounter != null) {
485             trafficCounter.stop();
486         }
487         release.set(true);
488         if (timeout != null) {
489             timeout.cancel();
490         }
491         timer.stop();
492     }
493 
494     @Override
495     public String toString() {
496         return "TrafficShaping with Write Limit: " + writeLimit +
497                 " Read Limit: " + readLimit + " and Counter: " +
498                 (trafficCounter != null? trafficCounter.toString() : "none");
499     }
500 }