View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.traffic;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelEvent;
20  import org.jboss.netty.channel.ChannelHandlerContext;
21  import org.jboss.netty.channel.ChannelState;
22  import org.jboss.netty.channel.ChannelStateEvent;
23  import org.jboss.netty.channel.MessageEvent;
24  import org.jboss.netty.channel.SimpleChannelHandler;
25  import org.jboss.netty.logging.InternalLogger;
26  import org.jboss.netty.logging.InternalLoggerFactory;
27  import org.jboss.netty.util.DefaultObjectSizeEstimator;
28  import org.jboss.netty.util.ExternalResourceReleasable;
29  import org.jboss.netty.util.ObjectSizeEstimator;
30  import org.jboss.netty.util.Timeout;
31  import org.jboss.netty.util.Timer;
32  import org.jboss.netty.util.TimerTask;
33  
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  
37  /**
38   * AbstractTrafficShapingHandler allows to limit the global bandwidth
39   * (see {@link GlobalTrafficShapingHandler}) or per session
40   * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
41   * It allows too to implement an almost real time monitoring of the bandwidth using
42   * the monitors from {@link TrafficCounter} that will call back every checkInterval
43   * the method doAccounting of this handler.<br>
44   * <br>
45   *
46   * An {@link ObjectSizeEstimator} can be passed at construction to specify what
47   * is the size of the object to be read or write accordingly to the type of
48   * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
49   *
50   * If you want for any particular reasons to stop the monitoring (accounting) or to change
51   * the read/write limit or the check interval, several methods allow that for you:<br>
52   * <ul>
53   * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
54   * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
55   * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
56   * <li></li>
57   * </ul>
58   */
59  public abstract class AbstractTrafficShapingHandler extends
60          SimpleChannelHandler implements ExternalResourceReleasable {
61      /**
62       * Internal logger
63       */
64      static InternalLogger logger = InternalLoggerFactory
65              .getInstance(AbstractTrafficShapingHandler.class);
66  
67      /**
68       * Default delay between two checks: 1s
69       */
70      public static final long DEFAULT_CHECK_INTERVAL = 1000;
71      /**
72       * Default max delay in case of traffic shaping
73       * (during which no communication will occur).
74       * Shall be less than TIMEOUT. Here half of "standard" 30s
75       */
76      public static final long DEFAULT_MAX_TIME = 15000;
77  
78      /**
79       * Default minimal time to wait
80       */
81      static final long MINIMAL_WAIT = 10;
82  
83      /**
84       * Traffic Counter
85       */
86      protected TrafficCounter trafficCounter;
87  
88      /**
89       * ObjectSizeEstimator
90       */
91      private ObjectSizeEstimator objectSizeEstimator;
92  
93      /**
94       * Timer associated to any TrafficCounter
95       */
96      protected Timer timer;
97  
98      /**
99       * used in releaseExternalResources() to cancel the timer
100      */
101     private volatile Timeout timeout;
102 
103     /**
104      * Limit in B/s to apply to write
105      */
106     private long writeLimit;
107 
108     /**
109      * Limit in B/s to apply to read
110      */
111     private long readLimit;
112 
113     /**
114      * Delay between two performance snapshots
115      */
116     protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
117     /**
118      * Max delay in wait
119      */
120     protected long maxTime = DEFAULT_MAX_TIME; // default 15 s
121 
122     /**
123      * Boolean associated with the release of this TrafficShapingHandler.
124      * It will be true only once when the releaseExternalRessources is called
125      * to prevent waiting when shutdown.
126      */
127     final AtomicBoolean release = new AtomicBoolean(false);
128 
129      private void init(ObjectSizeEstimator newObjectSizeEstimator,
130              Timer newTimer, long newWriteLimit, long newReadLimit,
131              long newCheckInterval, long newMaxTime) {
132          objectSizeEstimator = newObjectSizeEstimator;
133          timer = newTimer;
134          writeLimit = newWriteLimit;
135          readLimit = newReadLimit;
136          checkInterval = newCheckInterval;
137          maxTime = newMaxTime;
138          //logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
139      }
140 
141     /**
142      *
143      * @param newTrafficCounter the TrafficCounter to set
144      */
145     void setTrafficCounter(TrafficCounter newTrafficCounter) {
146         trafficCounter = newTrafficCounter;
147     }
148 
149     /**
150      * Constructor using default {@link ObjectSizeEstimator}
151      *
152      * @param timer
153      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
154      * @param writeLimit
155      *          0 or a limit in bytes/s
156      * @param readLimit
157      *          0 or a limit in bytes/s
158      * @param checkInterval
159      *          The delay between two computations of performances for
160      *            channels or 0 if no stats are to be computed
161      */
162     protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
163                                             long readLimit, long checkInterval) {
164         init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
165                 DEFAULT_MAX_TIME);
166     }
167 
168     /**
169      * Constructor using the specified ObjectSizeEstimator
170      *
171      * @param objectSizeEstimator
172      *            the {@link ObjectSizeEstimator} that will be used to compute
173      *            the size of the message
174      * @param timer
175      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
176      * @param writeLimit
177      *          0 or a limit in bytes/s
178      * @param readLimit
179      *          0 or a limit in bytes/s
180      * @param checkInterval
181      *          The delay between two computations of performances for
182      *            channels or 0 if no stats are to be computed
183      */
184     protected AbstractTrafficShapingHandler(
185             ObjectSizeEstimator objectSizeEstimator, Timer timer,
186             long writeLimit, long readLimit, long checkInterval) {
187         init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
188     }
189 
190     /**
191      * Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
192      *
193      * @param timer
194      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
195      * @param writeLimit
196      *          0 or a limit in bytes/s
197      * @param readLimit
198      *          0 or a limit in bytes/s
199      */
200     protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
201                                             long readLimit) {
202         init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit,
203                 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
204     }
205 
206     /**
207      * Constructor using the specified ObjectSizeEstimator and using default Check Interval
208      *
209      * @param objectSizeEstimator
210      *            the {@link ObjectSizeEstimator} that will be used to compute
211      *            the size of the message
212      * @param timer
213      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
214      * @param writeLimit
215      *          0 or a limit in bytes/s
216      * @param readLimit
217      *          0 or a limit in bytes/s
218      */
219     protected AbstractTrafficShapingHandler(
220             ObjectSizeEstimator objectSizeEstimator, Timer timer,
221             long writeLimit, long readLimit) {
222         init(objectSizeEstimator, timer, writeLimit, readLimit,
223                 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
224     }
225 
226     /**
227      * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
228      *
229      * @param timer
230      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
231      */
232     protected AbstractTrafficShapingHandler(Timer timer) {
233         init(new DefaultObjectSizeEstimator(), timer, 0, 0,
234                 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
235     }
236 
237     /**
238      * Constructor using the specified ObjectSizeEstimator and using NO LIMIT and default Check Interval
239      *
240      * @param objectSizeEstimator
241      *            the {@link ObjectSizeEstimator} that will be used to compute
242      *            the size of the message
243      * @param timer
244      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
245      */
246     protected AbstractTrafficShapingHandler(
247             ObjectSizeEstimator objectSizeEstimator, Timer timer) {
248         init(objectSizeEstimator, timer, 0, 0,
249                 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
250     }
251 
252     /**
253      * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
254      *
255      * @param timer
256      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
257      * @param checkInterval
258      *          The delay between two computations of performances for
259      *            channels or 0 if no stats are to be computed
260      */
261     protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
262         init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
263     }
264 
265     /**
266      * Constructor using the specified ObjectSizeEstimator and using NO LIMIT
267      *
268      * @param objectSizeEstimator
269      *            the {@link ObjectSizeEstimator} that will be used to compute
270      *            the size of the message
271      * @param timer
272      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
273      * @param checkInterval
274      *          The delay between two computations of performances for
275      *            channels or 0 if no stats are to be computed
276      */
277     protected AbstractTrafficShapingHandler(
278             ObjectSizeEstimator objectSizeEstimator, Timer timer,
279             long checkInterval) {
280         init(objectSizeEstimator, timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
281     }
282 
283     /**
284      * Constructor using default {@link ObjectSizeEstimator}
285      *
286      * @param timer
287      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
288      * @param writeLimit
289      *          0 or a limit in bytes/s
290      * @param readLimit
291      *          0 or a limit in bytes/s
292      * @param checkInterval
293      *          The delay between two computations of performances for
294      *            channels or 0 if no stats are to be computed
295      * @param maxTime
296      *          The max time to wait in case of excess of traffic (to prevent Time Out event)
297      */
298     protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
299                                             long readLimit, long checkInterval, long maxTime) {
300         init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
301                 maxTime);
302     }
303 
304     /**
305      * Constructor using the specified ObjectSizeEstimator
306      *
307      * @param objectSizeEstimator
308      *            the {@link ObjectSizeEstimator} that will be used to compute
309      *            the size of the message
310      * @param timer
311      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
312      * @param writeLimit
313      *          0 or a limit in bytes/s
314      * @param readLimit
315      *          0 or a limit in bytes/s
316      * @param checkInterval
317      *          The delay between two computations of performances for
318      *            channels or 0 if no stats are to be computed
319      * @param maxTime
320      *          The max time to wait in case of excess of traffic (to prevent Time Out event)
321      */
322     protected AbstractTrafficShapingHandler(
323             ObjectSizeEstimator objectSizeEstimator, Timer timer,
324             long writeLimit, long readLimit, long checkInterval, long maxTime) {
325         init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, maxTime);
326     }
327 
328     /**
329      * Change the underlying limitations and check interval.
330      */
331     public void configure(long newWriteLimit, long newReadLimit,
332             long newCheckInterval) {
333         configure(newWriteLimit, newReadLimit);
334         configure(newCheckInterval);
335     }
336 
337     /**
338      * Change the underlying limitations.
339      */
340     public void configure(long newWriteLimit, long newReadLimit) {
341         writeLimit = newWriteLimit;
342         readLimit = newReadLimit;
343         if (trafficCounter != null) {
344             trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
345         }
346     }
347 
348     /**
349      * Change the check interval.
350      */
351     public void configure(long newCheckInterval) {
352         setCheckInterval(newCheckInterval);
353     }
354 
355     /**
356      * @return the writeLimit
357      */
358     public long getWriteLimit() {
359         return writeLimit;
360     }
361 
362     /**
363      * @param writeLimit the writeLimit to set
364      */
365     public void setWriteLimit(long writeLimit) {
366         this.writeLimit = writeLimit;
367         if (trafficCounter != null) {
368             trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
369         }
370     }
371 
372     /**
373      * @return the readLimit
374      */
375     public long getReadLimit() {
376         return readLimit;
377     }
378 
379     /**
380      * @param readLimit the readLimit to set
381      */
382     public void setReadLimit(long readLimit) {
383         this.readLimit = readLimit;
384         if (trafficCounter != null) {
385             trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
386         }
387     }
388 
389     /**
390      * @return the checkInterval
391      */
392     public long getCheckInterval() {
393         return checkInterval;
394     }
395 
396     /**
397      * @param newCheckInterval the checkInterval to set
398      */
399     public void setCheckInterval(long newCheckInterval) {
400         this.checkInterval = newCheckInterval;
401         if (trafficCounter != null) {
402             trafficCounter.configure(checkInterval);
403         }
404     }
405 
406     /**
407      * @return the max delay on wait
408      */
409     public long getMaxTimeWait() {
410         return maxTime;
411     }
412 
413     /**
414     *
415     * @param maxTime
416     *    Max delay in wait, shall be less than TIME OUT in related protocol
417     */
418    public void setMaxTimeWait(long maxTime) {
419        this.maxTime = maxTime;
420    }
421 
422     /**
423      * Called each time the accounting is computed from the TrafficCounters.
424      * This method could be used for instance to implement almost real time accounting.
425      *
426      * @param counter
427      *            the TrafficCounter that computes its performance
428      */
429     protected void doAccounting(TrafficCounter counter) {
430         // NOOP by default
431     }
432 
433     /**
434      * Class to implement setReadable at fix time
435      */
436     private class ReopenReadTimerTask implements TimerTask {
437         final ChannelHandlerContext ctx;
438         ReopenReadTimerTask(ChannelHandlerContext ctx) {
439             this.ctx = ctx;
440         }
441         public void run(Timeout timeoutArg) throws Exception {
442             //logger.warn("Start RRTT: "+release.get());
443             if (release.get()) {
444                 return;
445             }
446             if (!ctx.getChannel().isReadable() && ctx.getAttachment() == null) {
447                 // If isReadable is False and Active is True, user make a direct setReadable(false)
448                 // Then Just reset the status
449                 if (logger.isDebugEnabled()) {
450                     logger.debug("Not Unsuspend: " + ctx.getChannel().isReadable() + ":" +
451                             (ctx.getAttachment() == null));
452                 }
453                 ctx.setAttachment(null);
454             } else {
455                 // Anything else allows the handler to reset the AutoRead
456                 if (logger.isDebugEnabled()) {
457                     if (ctx.getChannel().isReadable() && ctx.getAttachment() != null) {
458                         logger.debug("Unsuspend: " + ctx.getChannel().isReadable() + ":" +
459                                 (ctx.getAttachment() == null));
460                     } else {
461                         logger.debug("Normal Unsuspend: " + ctx.getChannel().isReadable() + ":" +
462                                 (ctx.getAttachment() == null));
463                     }
464                 }
465                 ctx.setAttachment(null);
466                 ctx.getChannel().setReadable(true);
467             }
468             if (logger.isDebugEnabled()) {
469                 logger.debug("Unsupsend final status => " + ctx.getChannel().isReadable() + ":" +
470                         (ctx.getAttachment() == null));
471             }
472         }
473     }
474 
475     @Override
476     public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
477             throws Exception {
478         try {
479             long size = objectSizeEstimator.estimateSize(evt.getMessage());
480             if (size > 0 && trafficCounter != null) {
481                 // compute the number of ms to wait before reopening the channel
482                 long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime);
483                 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
484                     // time in order to try to limit the traffic
485                     if (release.get()) {
486                         return;
487                     }
488                     Channel channel = ctx.getChannel();
489                     if (channel != null && channel.isConnected()) {
490                         // Only AutoRead AND HandlerActive True means Context Active
491                         if (logger.isDebugEnabled()) {
492                             logger.debug("Read Suspend: " + wait + ":" + channel.isReadable() + ":" +
493                                     (ctx.getAttachment() == null));
494                         }
495                         if (timer == null) {
496                             // Sleep since no executor
497                             // logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
498                             Thread.sleep(wait);
499                             return;
500                         }
501                         if (channel.isReadable() && ctx.getAttachment() == null) {
502                             ctx.setAttachment(Boolean.TRUE);
503                             channel.setReadable(false);
504                             if (logger.isDebugEnabled()) {
505                                 logger.debug("Suspend final status => " + channel.isReadable() + ":" +
506                                         (ctx.getAttachment() == null));
507                             }
508                             // Create a Runnable to reactive the read if needed. If one was create before
509                             // it will just be reused to limit object creation
510                             TimerTask timerTask = new ReopenReadTimerTask(ctx);
511                             timeout = timer.newTimeout(timerTask, wait,
512                                     TimeUnit.MILLISECONDS);
513                         }
514                     }
515                 }
516             }
517         } finally {
518             // The message is then just passed to the next handler
519             super.messageReceived(ctx, evt);
520         }
521     }
522 
523     @Override
524     public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
525             throws Exception {
526         long wait = 0;
527         try {
528             long size = objectSizeEstimator.estimateSize(evt.getMessage());
529             if (size > 0 && trafficCounter != null) {
530                 // compute the number of ms to wait before continue with the channel
531                 wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime);
532                 if (logger.isDebugEnabled()) {
533                     logger.debug("Write Suspend: " + wait + ":" + ctx.getChannel().isReadable() + ":" +
534                             (ctx.getAttachment() == null));
535                 }
536                 if (wait >= MINIMAL_WAIT) {
537                     if (release.get()) {
538                         return;
539                     }
540                     /*
541                      * Option 2:
542                      * Thread.sleep(wait);
543                      * System.out.println("Write unsuspended");
544                      * Option 1: use an ordered list of messages to send
545                      * Warning of memory pressure!
546                      */
547                 } else {
548                     wait = 0;
549                 }
550             }
551         } finally {
552             if (release.get()) {
553                 return;
554             }
555             // The message is then just passed to the next handler
556             submitWrite(ctx, evt, wait);
557         }
558     }
559 
560     protected void internalSubmitWrite(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
561         super.writeRequested(ctx, evt);
562     }
563 
564     protected abstract void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long delay)
565             throws Exception;
566 
567     @Override
568     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
569             throws Exception {
570         if (e instanceof ChannelStateEvent) {
571             ChannelStateEvent cse = (ChannelStateEvent) e;
572             if (cse.getState() == ChannelState.INTEREST_OPS &&
573                     (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
574 
575                 // setReadable(true) requested
576                 boolean readSuspended = ctx.getAttachment() != null;
577                 if (readSuspended) {
578                     // Drop the request silently if this handler has
579                     // set the flag.
580                     e.getFuture().setSuccess();
581                     return;
582                 }
583             }
584         }
585         super.handleDownstream(ctx, e);
586     }
587 
588     /**
589      *
590      * @return the current TrafficCounter (if
591      *         channel is still connected)
592      */
593     public TrafficCounter getTrafficCounter() {
594         return trafficCounter;
595     }
596 
597     public void releaseExternalResources() {
598         if (trafficCounter != null) {
599             trafficCounter.stop();
600         }
601         release.set(true);
602         if (timeout != null) {
603             timeout.cancel();
604         }
605         //shall be done outside (since it can be shared): timer.stop();
606     }
607 
608     @Override
609     public String toString() {
610         return "TrafficShaping with Write Limit: " + writeLimit +
611                 " Read Limit: " + readLimit + " every: " + checkInterval + " and Counter: " +
612                 (trafficCounter != null? trafficCounter.toString() : "none");
613     }
614 }