1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.traffic;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.channel.Channel;
20  import org.jboss.netty.channel.ChannelHandlerContext;
21  import org.jboss.netty.channel.ChannelStateEvent;
22  import org.jboss.netty.channel.MessageEvent;
23  import org.jboss.netty.channel.SimpleChannelHandler;
24  import org.jboss.netty.logging.InternalLogger;
25  import org.jboss.netty.logging.InternalLoggerFactory;
26  import org.jboss.netty.util.DefaultObjectSizeEstimator;
27  import org.jboss.netty.util.ExternalResourceReleasable;
28  import org.jboss.netty.util.ObjectSizeEstimator;
29  import org.jboss.netty.util.Timeout;
30  import org.jboss.netty.util.Timer;
31  import org.jboss.netty.util.TimerTask;
32  
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  
36  /**
37   * <p>AbstractTrafficShapingHandler allows to limit the global bandwidth
38   * (see {@link GlobalTrafficShapingHandler}) or per session
39   * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
40   * It allows too to implement an almost real time monitoring of the bandwidth using
41   * the monitors from {@link TrafficCounter} that will call back every checkInterval
42   * the method doAccounting of this handler.</p>
43   *
44   * <p>An {@link ObjectSizeEstimator} can be passed at construction to specify what
45   * is the size of the object to be read or write accordingly to the type of
46   * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.</p>
47   *
48   * <p>If you want for any particular reasons to stop the monitoring (accounting) or to change
49   * the read/write limit or the check interval, several methods allow that for you:</p>
50   * <ul>
51   * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
52   * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
53   * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
54   * </ul>
55   */
56  public abstract class AbstractTrafficShapingHandler extends
57          SimpleChannelHandler implements ExternalResourceReleasable {
58      /**
59       * Internal logger
60       */
61      static InternalLogger logger = InternalLoggerFactory
62              .getInstance(AbstractTrafficShapingHandler.class);
63  
64      /**
65       * Default delay between two checks: 1s
66       */
67      public static final long DEFAULT_CHECK_INTERVAL = 1000;
68  
69      /**
70       * Default max delay in case of traffic shaping
71       * (during which no communication will occur).
72       * Shall be less than TIMEOUT. Here half of "standard" 30s
73       */
74      public static final long DEFAULT_MAX_TIME = 15000;
75  
76      /**
77       * Default max size to not exceed in buffer (write only).
78       */
79      static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
80  
81      /**
82       * Default minimal time to wait
83       */
84      static final long MINIMAL_WAIT = 10;
85  
86      static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
87      static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
88      static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
89  
90      /**
91       * Traffic Counter
92       */
93      protected TrafficCounter trafficCounter;
94  
95      /**
96       * ObjectSizeEstimator
97       */
98      private ObjectSizeEstimator objectSizeEstimator;
99  
100     /**
101      * Timer associated to any TrafficCounter
102      */
103     protected Timer timer;
104 
105     /**
106      * used in releaseExternalResources() to cancel the timer
107      */
108     volatile Timeout timeout;
109 
110     /**
111      * Limit in B/s to apply to write
112      */
113     private volatile long writeLimit;
114 
115     /**
116      * Limit in B/s to apply to read
117      */
118     private volatile long readLimit;
119 
120     /**
121      * Delay between two performance snapshots
122      */
123     protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
124 
125     /**
126      * Max delay in wait
127      */
128     protected volatile long maxTime = DEFAULT_MAX_TIME; // default 15 s
129 
130     /**
131      * Max time to delay before proposing to stop writing new objects from next handlers
132      */
133     volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL; // default 4 s
134 
135     /**
136      * Max size in the list before proposing to stop writing new objects from next handlers
137      */
138     volatile long maxWriteSize = DEFAULT_MAX_SIZE; // default 4MB
139 
140     /**
141      * Boolean associated with the release of this TrafficShapingHandler.
142      * It will be true only once when the releaseExternalRessources is called
143      * to prevent waiting when shutdown.
144      */
145     final AtomicBoolean release = new AtomicBoolean(false);
146     final int index;
147 
148     /**
149      * Attachment of ChannelHandlerContext
150      *
151      */
152     static final class ReadWriteStatus {
153         volatile boolean readSuspend;
154         volatile TimerTask reopenReadTimerTask;
155     }
156 
157     /**
158      * For simple ChannelBuffer, returns the readableBytes, else
159      * use standard DefaultObjectSizeEstimator.
160      */
161     public static class SimpleObjectSizeEstimator extends DefaultObjectSizeEstimator {
162         @Override
163         public int estimateSize(Object o) {
164             int size;
165             if (o instanceof ChannelBuffer) {
166                 size = ((ChannelBuffer) o).readableBytes();
167             } else {
168                 size = super.estimateSize(o);
169             }
170             return size;
171         }
172     }
173 
174     /**
175      * @return the index to be used by the TrafficShapingHandler to manage the user defined
176      *         writability. For Channel TSH it is defined as
177      *         {@value #CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}, for Global TSH it is
178      *         defined as {@value #GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
179      *         for GlobalChannel TSH it is defined as
180      *         {@value #GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}.
181      */
182     int userDefinedWritabilityIndex() {
183         if (this instanceof GlobalChannelTrafficShapingHandler) {
184             return GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
185         } else if (this instanceof GlobalTrafficShapingHandler) {
186             return GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
187         } else {
188             return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
189         }
190     }
191 
192     private void init(ObjectSizeEstimator newObjectSizeEstimator,
193              Timer newTimer, long newWriteLimit, long newReadLimit,
194              long newCheckInterval, long newMaxTime) {
195         if (newMaxTime <= 0) {
196             throw new IllegalArgumentException("maxTime must be positive");
197         }
198          objectSizeEstimator = newObjectSizeEstimator;
199          timer = newTimer;
200          writeLimit = newWriteLimit;
201          readLimit = newReadLimit;
202          checkInterval = newCheckInterval;
203          maxTime = newMaxTime;
204          //logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
205      }
206 
207     /**
208      * @param newTrafficCounter the TrafficCounter to set
209      */
210     void setTrafficCounter(TrafficCounter newTrafficCounter) {
211         trafficCounter = newTrafficCounter;
212     }
213 
214     /**
215      * Constructor using default {@link ObjectSizeEstimator} and
216      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
217      *
218      * @param timer
219      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
220      * @param writeLimit
221      *          0 or a limit in bytes/s
222      * @param readLimit
223      *          0 or a limit in bytes/s
224      * @param checkInterval
225      *          The delay between two computations of performances for
226      *            channels or 0 if no stats are to be computed.
227      */
228     protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
229                                             long readLimit, long checkInterval) {
230         index = userDefinedWritabilityIndex();
231         init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
232                 DEFAULT_MAX_TIME);
233     }
234 
235     /**
236      * Constructor using the specified ObjectSizeEstimator and
237      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
238      *
239      * @param objectSizeEstimator
240      *            the {@link ObjectSizeEstimator} that will be used to compute
241      *            the size of the message.
242      * @param timer
243      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
244      * @param writeLimit
245      *          0 or a limit in bytes/s
246      * @param readLimit
247      *          0 or a limit in bytes/s
248      * @param checkInterval
249      *          The delay between two computations of performances for
250      *            channels or 0 if no stats are to be computed.
251      */
252     protected AbstractTrafficShapingHandler(
253             ObjectSizeEstimator objectSizeEstimator, Timer timer,
254             long writeLimit, long readLimit, long checkInterval) {
255         index = userDefinedWritabilityIndex();
256         init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
257     }
258 
259     /**
260      * Constructor using default {@link ObjectSizeEstimator} and using
261      * default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
262      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
263      *
264      * @param timer
265      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
266      * @param writeLimit
267      *          0 or a limit in bytes/s
268      * @param readLimit
269      *          0 or a limit in bytes/s
270      */
271     protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
272                                             long readLimit) {
273         index = userDefinedWritabilityIndex();
274         init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit,
275                 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
276     }
277 
278     /**
279      * Constructor using the specified ObjectSizeEstimator and using
280      * default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
281      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
282      *
283      * @param objectSizeEstimator
284      *            the {@link ObjectSizeEstimator} that will be used to compute
285      *            the size of the message.
286      * @param timer
287      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
288      * @param writeLimit
289      *          0 or a limit in bytes/s
290      * @param readLimit
291      *          0 or a limit in bytes/s
292      */
293     protected AbstractTrafficShapingHandler(
294             ObjectSizeEstimator objectSizeEstimator, Timer timer,
295             long writeLimit, long readLimit) {
296         index = userDefinedWritabilityIndex();
297         init(objectSizeEstimator, timer, writeLimit, readLimit,
298                 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
299     }
300 
301     /**
302      * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and
303      * default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
304      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
305      *
306      * @param timer
307      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
308      */
309     protected AbstractTrafficShapingHandler(Timer timer) {
310         index = userDefinedWritabilityIndex();
311         init(new SimpleObjectSizeEstimator(), timer, 0, 0,
312                 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
313     }
314 
315     /**
316      * Constructor using the specified ObjectSizeEstimator and using NO LIMIT and
317      * default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
318      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
319      *
320      * @param objectSizeEstimator
321      *            the {@link ObjectSizeEstimator} that will be used to compute
322      *            the size of the message.
323      * @param timer
324      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
325      */
326     protected AbstractTrafficShapingHandler(
327             ObjectSizeEstimator objectSizeEstimator, Timer timer) {
328         index = userDefinedWritabilityIndex();
329         init(objectSizeEstimator, timer, 0, 0,
330                 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
331     }
332 
333     /**
334      * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and
335      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
336      *
337      * @param timer
338      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
339      * @param checkInterval
340      *          The delay between two computations of performances for
341      *            channels or 0 if no stats are to be computed.
342      */
343     protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
344         index = userDefinedWritabilityIndex();
345         init(new SimpleObjectSizeEstimator(), timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
346     }
347 
348     /**
349      * Constructor using the specified ObjectSizeEstimator and using NO LIMIT and
350      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
351      *
352      * @param objectSizeEstimator
353      *            the {@link ObjectSizeEstimator} that will be used to compute
354      *            the size of the message.
355      * @param timer
356      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
357      * @param checkInterval
358      *          The delay between two computations of performances for
359      *            channels or 0 if no stats are to be computed.
360      */
361     protected AbstractTrafficShapingHandler(
362             ObjectSizeEstimator objectSizeEstimator, Timer timer,
363             long checkInterval) {
364         index = userDefinedWritabilityIndex();
365         init(objectSizeEstimator, timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
366     }
367 
368     /**
369      * Constructor using default {@link ObjectSizeEstimator}.
370      *
371      * @param timer
372      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
373      * @param writeLimit
374      *          0 or a limit in bytes/s
375      * @param readLimit
376      *          0 or a limit in bytes/s
377      * @param checkInterval
378      *          The delay between two computations of performances for
379      *            channels or 0 if no stats are to be computed.
380      * @param maxTime
381      *          The max time to wait in case of excess of traffic (to prevent Time Out event).
382      *          Must be positive.
383      */
384     protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
385                                             long readLimit, long checkInterval, long maxTime) {
386         index = userDefinedWritabilityIndex();
387         init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
388                 maxTime);
389     }
390 
391     /**
392      * Constructor using the specified ObjectSizeEstimator.
393      *
394      * @param objectSizeEstimator
395      *            the {@link ObjectSizeEstimator} that will be used to compute
396      *            the size of the message.
397      * @param timer
398      *          created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
399      * @param writeLimit
400      *          0 or a limit in bytes/s
401      * @param readLimit
402      *          0 or a limit in bytes/s
403      * @param checkInterval
404      *          The delay between two computations of performances for
405      *            channels or 0 if no stats are to be computed.
406      * @param maxTime
407      *          The max time to wait in case of excess of traffic (to prevent Time Out event).
408      *          Must be positive.
409      */
410     protected AbstractTrafficShapingHandler(
411             ObjectSizeEstimator objectSizeEstimator, Timer timer,
412             long writeLimit, long readLimit, long checkInterval, long maxTime) {
413         index = userDefinedWritabilityIndex();
414         init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, maxTime);
415     }
416 
417     /**
418      * Change the underlying limitations and check interval.
419      * <p>Note the change will be taken as best effort, meaning
420      * that all already scheduled traffics will not be
421      * changed, but only applied to new traffics.</p>
422      * So the expected usage of this method is to be used not too often,
423      * accordingly to the traffic shaping configuration.
424      *
425      * @param newWriteLimit
426      *            The new write limit (in bytes)
427      * @param newReadLimit
428      *            The new read limit (in bytes)
429      * @param newCheckInterval
430      *            The new check interval (in milliseconds)
431      */
432     public void configure(long newWriteLimit, long newReadLimit,
433             long newCheckInterval) {
434         configure(newWriteLimit, newReadLimit);
435         configure(newCheckInterval);
436     }
437 
438     /**
439      * Change the underlying limitations.
440      * <p>Note the change will be taken as best effort, meaning
441      * that all already scheduled traffics will not be
442      * changed, but only applied to new traffics.</p>
443      * So the expected usage of this method is to be used not too often,
444      * accordingly to the traffic shaping configuration.
445      *
446      * @param newWriteLimit
447      *            The new write limit (in bytes)
448      * @param newReadLimit
449      *            The new read limit (in bytes)
450      */
451     public void configure(long newWriteLimit, long newReadLimit) {
452         writeLimit = newWriteLimit;
453         readLimit = newReadLimit;
454         if (trafficCounter != null) {
455             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
456         }
457     }
458 
459     /**
460      * Change the check interval.
461      */
462     public void configure(long newCheckInterval) {
463         setCheckInterval(newCheckInterval);
464     }
465 
466     /**
467      * @return the writeLimit
468      */
469     public long getWriteLimit() {
470         return writeLimit;
471     }
472 
473     /**
474      * <p>Note the change will be taken as best effort, meaning
475      * that all already scheduled traffics will not be
476      * changed, but only applied to new traffics.</p>
477      * So the expected usage of this method is to be used not too often,
478      * accordingly to the traffic shaping configuration.
479      *
480      * @param writeLimit the writeLimit to set
481      */
482     public void setWriteLimit(long writeLimit) {
483         this.writeLimit = writeLimit;
484         if (trafficCounter != null) {
485             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
486         }
487     }
488 
489     /**
490      * @return the readLimit
491      */
492     public long getReadLimit() {
493         return readLimit;
494     }
495 
496     /**
497      * <p>Note the change will be taken as best effort, meaning
498      * that all already scheduled traffics will not be
499      * changed, but only applied to new traffics.</p>
500      * So the expected usage of this method is to be used not too often,
501      * accordingly to the traffic shaping configuration.
502      *
503      * @param readLimit the readLimit to set
504      */
505     public void setReadLimit(long readLimit) {
506         this.readLimit = readLimit;
507         if (trafficCounter != null) {
508             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
509         }
510     }
511 
512     /**
513      * @return the checkInterval
514      */
515     public long getCheckInterval() {
516         return checkInterval;
517     }
518 
519     /**
520      * @param newCheckInterval the checkInterval to set
521      */
522     public void setCheckInterval(long newCheckInterval) {
523         checkInterval = newCheckInterval;
524         if (trafficCounter != null) {
525             trafficCounter.configure(checkInterval);
526         }
527     }
528 
529     /**
530      * @return the max delay on wait
531      */
532     public long getMaxTimeWait() {
533         return maxTime;
534     }
535 
536    /**
537     * <p>Note the change will be taken as best effort, meaning
538     * that all already scheduled traffics will not be
539     * changed, but only applied to new traffics.</p>
540     * So the expected usage of this method is to be used not too often,
541     * accordingly to the traffic shaping configuration.
542     *
543     * @param maxTime
544     *    Max delay in wait, shall be less than TIME OUT in related protocol.
545     *    Must be positive.
546     */
547    public void setMaxTimeWait(long maxTime) {
548        if (maxTime <= 0) {
549            throw new IllegalArgumentException("maxTime must be positive");
550        }
551        this.maxTime = maxTime;
552    }
553 
554    /**
555     * @return the maxWriteDelay
556     */
557    public long getMaxWriteDelay() {
558        return maxWriteDelay;
559    }
560 
561    /**
562     * <p>Note the change will be taken as best effort, meaning
563     * that all already scheduled traffics will not be
564     * changed, but only applied to new traffics.</p>
565     * So the expected usage of this method is to be used not too often,
566     * accordingly to the traffic shaping configuration.
567     *
568     * @param maxWriteDelay the maximum Write Delay in ms in the buffer allowed before write suspended is set.
569     *       Must be positive.
570     */
571    public void setMaxWriteDelay(long maxWriteDelay) {
572        if (maxWriteDelay <= 0) {
573            throw new IllegalArgumentException("maxWriteDelay must be positive");
574        }
575        this.maxWriteDelay = maxWriteDelay;
576    }
577 
578    /**
579     * @return the maxWriteSize default being {@value #DEFAULT_MAX_SIZE} bytes
580     */
581    public long getMaxWriteSize() {
582        return maxWriteSize;
583    }
584 
585    /**
586     * <p>Note the change will be taken as best effort, meaning
587     * that all already scheduled traffics will not be
588     * changed, but only applied to new traffics.</p>
589     * So the expected usage of this method is to be used not too often,
590     * accordingly to the traffic shaping configuration.
591     *
592     * @param maxWriteSize the maximum Write Size allowed in the buffer
593     *            per channel before write suspended is set,
594     *            default being {@value #DEFAULT_MAX_SIZE} bytes
595     */
596    public void setMaxWriteSize(long maxWriteSize) {
597        this.maxWriteSize = maxWriteSize;
598    }
599 
600     /**
601      * Called each time the accounting is computed from the TrafficCounters.
602      * This method could be used for instance to implement almost real time accounting.
603      *
604      * @param counter
605      *            the TrafficCounter that computes its performance
606      */
607     protected void doAccounting(TrafficCounter counter) {
608         // NOOP by default
609     }
610 
611     /**
612      * Class to implement setReadable at fix time.
613      */
614     class ReopenReadTimerTask implements TimerTask {
615         final ChannelHandlerContext ctx;
616         ReopenReadTimerTask(ChannelHandlerContext ctx) {
617             this.ctx = ctx;
618         }
619         public void run(Timeout timeoutArg) throws Exception {
620             //logger.warn("Start RRTT: "+release.get());
621             if (release.get()) {
622                 return;
623             }
624             ReadWriteStatus rws = checkAttachment(ctx);
625             Channel channel = ctx.getChannel();
626             if (! channel.isConnected()) {
627                 // ignore
628                 return;
629             }
630             if (!channel.isReadable() && ! rws.readSuspend) {
631                 // If isReadable is False and Active is True, user make a direct setReadable(false)
632                 // Then Just reset the status
633                 if (logger.isDebugEnabled()) {
634                     logger.debug("Not unsuspend: " + channel.isReadable() + ':' +
635                             rws.readSuspend);
636                 }
637                 rws.readSuspend = false;
638             } else {
639                 // Anything else allows the handler to reset the AutoRead
640                 if (logger.isDebugEnabled()) {
641                     if (channel.isReadable() && rws.readSuspend) {
642                         logger.debug("Unsuspend: " + channel.isReadable() + ':' +
643                                 rws.readSuspend);
644                     } else {
645                         logger.debug("Normal unsuspend: " + channel.isReadable() + ':' +
646                                 rws.readSuspend);
647                     }
648                 }
649                 rws.readSuspend = false;
650                 channel.setReadable(true);
651             }
652             if (logger.isDebugEnabled()) {
653                 logger.debug("Unsupsend final status => " + channel.isReadable() + ':' +
654                         rws.readSuspend);
655             }
656         }
657     }
658 
659    /**
660     * Release the Read suspension.
661     */
662     void releaseReadSuspended(ChannelHandlerContext ctx) {
663         ReadWriteStatus rws = checkAttachment(ctx);
664         rws.readSuspend = false;
665         ctx.getChannel().setReadable(true);
666     }
667 
668     @Override
669     public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
670             throws Exception {
671         long now = TrafficCounter.milliSecondFromNano();
672         try {
673             ReadWriteStatus rws = checkAttachment(ctx);
674             long size = calculateSize(evt.getMessage());
675             if (size > 0 && trafficCounter != null) {
676                 // compute the number of ms to wait before reopening the channel
677                 long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
678                 wait = checkWaitReadTime(ctx, wait, now);
679                 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
680                     // time in order to try to limit the traffic
681                     if (release.get()) {
682                         return;
683                     }
684                     Channel channel = ctx.getChannel();
685                     if (channel != null && channel.isConnected()) {
686                         // Only AutoRead AND HandlerActive True means Context Active
687                         if (logger.isDebugEnabled()) {
688                             logger.debug("Read suspend: " + wait + ':' + channel.isReadable() + ':' +
689                                     rws.readSuspend);
690                         }
691                         if (timer == null) {
692                             // Sleep since no executor
693                             // logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
694                             Thread.sleep(wait);
695                             return;
696                         }
697                         if (channel.isReadable() && ! rws.readSuspend) {
698                             rws.readSuspend = true;
699                             channel.setReadable(false);
700                             if (logger.isDebugEnabled()) {
701                                 logger.debug("Suspend final status => " + channel.isReadable() + ':' +
702                                         rws.readSuspend);
703                             }
704                             // Create a Runnable to reactive the read if needed. If one was create before
705                             // it will just be reused to limit object creation
706                             if (rws.reopenReadTimerTask == null) {
707                                 rws.reopenReadTimerTask = new ReopenReadTimerTask(ctx);
708                             }
709                             timeout = timer.newTimeout(rws.reopenReadTimerTask, wait,
710                                     TimeUnit.MILLISECONDS);
711                         }
712                     }
713                 }
714             }
715         } finally {
716             informReadOperation(ctx, now);
717             // The message is then just passed to the next handler
718             ctx.sendUpstream(evt);
719         }
720     }
721 
722     /**
723      * Method overridden in GTSH to take into account specific timer for the channel.
724      * @param wait the wait delay computed in ms
725      * @param now the relative now time in ms
726      * @return the wait to use according to the context.
727      */
728     long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
729         // no change by default
730         return wait;
731     }
732 
733     /**
734      * Method overridden in GTSH to take into account specific timer for the channel.
735      * @param now the relative now time in ms
736      */
737     void informReadOperation(final ChannelHandlerContext ctx, final long now) {
738         // default noop
739     }
740 
741     @Override
742     public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
743             throws Exception {
744         long wait = 0;
745         long size = calculateSize(evt.getMessage());
746         long now = TrafficCounter.milliSecondFromNano();
747         Channel channel = ctx.getChannel();
748         try {
749             if (size > 0 && trafficCounter != null) {
750                 // compute the number of ms to wait before continue with the channel
751                 wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
752                 if (logger.isDebugEnabled()) {
753                     logger.debug("Write suspend: " + wait + ':' + channel.isWritable() + ':' +
754                             channel.getUserDefinedWritability(index));
755                 }
756                 if (wait < MINIMAL_WAIT || release.get()) {
757                     wait = 0;
758                 }
759             }
760         } finally {
761             // The message is scheduled
762             submitWrite(ctx, evt, size, wait, now);
763         }
764     }
765 
766     @Deprecated
767     protected void internalSubmitWrite(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
768         ctx.sendDownstream(evt);
769     }
770 
771     @Deprecated
772     protected void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt,
773             final long delay) throws Exception {
774         submitWrite(ctx, evt, calculateSize(evt.getMessage()), delay, TrafficCounter.milliSecondFromNano());
775     }
776 
777     abstract void submitWrite(ChannelHandlerContext ctx, MessageEvent evt, long size,
778             long delay, long now) throws Exception;
779 
780     void setWritable(ChannelHandlerContext ctx, boolean writable) {
781         Channel channel = ctx.getChannel();
782         if (channel.isConnected()) {
783             channel.setUserDefinedWritability(index, writable);
784         }
785     }
786 
787     /**
788      * Check the writability according to delay and size for the channel.
789      * Set if necessary the write suspended status.
790      * @param delay the computed delai
791      * @param queueSize the current queueSize
792      */
793     void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
794         if (queueSize > maxWriteSize || delay > maxWriteDelay) {
795             setWritable(ctx, false);
796         }
797     }
798 
799     /**
800      * Explicitly release the Write suspended status.
801      */
802     void releaseWriteSuspended(ChannelHandlerContext ctx) {
803         setWritable(ctx, true);
804     }
805 
806     /**
807      * @return the current TrafficCounter (if
808      *         channel is still connected).
809      */
810     public TrafficCounter getTrafficCounter() {
811         return trafficCounter;
812     }
813 
814     public void releaseExternalResources() {
815         if (trafficCounter != null) {
816             trafficCounter.stop();
817         }
818         release.set(true);
819         if (timeout != null) {
820             timeout.cancel();
821         }
822         //shall be done outside (since it can be shared): timer.stop();
823     }
824 
825     static ReadWriteStatus checkAttachment(ChannelHandlerContext ctx) {
826         ReadWriteStatus rws = (ReadWriteStatus) ctx.getAttachment();
827         if (rws == null) {
828             rws = new ReadWriteStatus();
829             ctx.setAttachment(rws);
830         }
831         return rws;
832     }
833 
834     @Override
835     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
836         checkAttachment(ctx);
837         setWritable(ctx, true);
838         super.channelConnected(ctx, e);
839     }
840 
841     protected long calculateSize(Object obj) {
842         //logger.debug("Size: "+size);
843         return objectSizeEstimator.estimateSize(obj);
844     }
845 
846     @Override
847     public String toString() {
848         StringBuilder builder = new StringBuilder(290)
849             .append("TrafficShaping with Write Limit: ").append(writeLimit)
850             .append(" Read Limit: ").append(readLimit)
851             .append(" CheckInterval: ").append(checkInterval)
852             .append(" maxDelay: ").append(maxWriteDelay)
853             .append(" maxSize: ").append(maxWriteSize)
854             .append(" and Counter: ");
855         if (trafficCounter != null) {
856             builder.append(trafficCounter);
857         } else {
858             builder.append("none");
859         }
860         return builder.toString();
861     }
862 }