View Javadoc

1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.traffic;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelHandler.Sharable;
20  import org.jboss.netty.channel.ChannelHandlerContext;
21  import org.jboss.netty.channel.ChannelStateEvent;
22  import org.jboss.netty.channel.MessageEvent;
23  import org.jboss.netty.logging.InternalLogger;
24  import org.jboss.netty.logging.InternalLoggerFactory;
25  import org.jboss.netty.util.ObjectSizeEstimator;
26  import org.jboss.netty.util.Timeout;
27  import org.jboss.netty.util.Timer;
28  import org.jboss.netty.util.TimerTask;
29  
30  import java.util.AbstractCollection;
31  import java.util.Collection;
32  import java.util.Iterator;
33  import java.util.LinkedList;
34  import java.util.List;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicLong;
39  
40  /**
41   * This implementation of the {@link AbstractTrafficShapingHandler} is for global
42   * and per channel traffic shaping, that is to say a global limitation of the bandwidth, whatever
43   * the number of opened channels and a per channel limitation of the bandwidth.<br><br>
44   * This version shall not be in the same pipeline than other TrafficShapingHandler.<br><br>
45   *
46   * The general use should be as follow:<br>
47   * <ul>
48   * <li>Create your unique GlobalChannelTrafficShapingHandler like:<br><br>
49   * <tt>GlobalChannelTrafficShapingHandler myHandler = new GlobalChannelTrafficShapingHandler(executor);</tt><br><br>
50   * The executor could be the underlying IO worker pool<br>
51   * <tt>pipeline.addLast(myHandler);</tt><br><br>
52   *
53   * <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
54   * and shared among all channels as the counter must be shared among all channels.</b><br><br>
55   *
56   * Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
57   * or the check interval (in millisecond) that represents the delay between two computations of the
58   * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br>
59   * Note that as this is a fusion of both Global and Channel Traffic Shaping, limits are in 2 sets,
60   * respectively Global and Channel.<br><br>
61   *
62   * A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
63   * it is recommended to set a positive value, even if it is high since the precision of the
64   * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
65   * the less precise the traffic shaping will be. It is suggested as higher value something close
66   * to 5 or 10 minutes.<br><br>
67   *
68   * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br>
69   * </li>
70   * <li>In your handler, you should consider to use the {@code channel.isWritable()} and
71   * {@code channelWritabilityChanged(ctx)} to handle writability, or through
72   * {@code future.addListener(new GenericFutureListener())} on the future returned by
73   * {@code ctx.write()}.</li>
74   * <li>You shall also consider to have object size in read or write operations relatively adapted to
75   * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
76   * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.<br><br></li>
77   * <li>Some configuration methods will be taken as best effort, meaning
78   * that all already scheduled traffics will not be
79   * changed, but only applied to new traffics.<br>
80   * So the expected usage of those methods are to be used not too often,
81   * accordingly to the traffic shaping configuration.</li>
82   * </ul><br>
83   *
84   * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources.
85   * This will not shutdown the {@link Timer} as it may be shared, so you need to do this by your own.
86   */
87  @Sharable
88  public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
89      private static final InternalLogger logger =
90              InternalLoggerFactory.getInstance(GlobalChannelTrafficShapingHandler.class);
91      /**
92       * All queues per channel
93       */
94      final ConcurrentMap<Integer, PerChannel> channelQueues = new ConcurrentHashMap<Integer, PerChannel>();
95  
96      /**
97       * Global queues size
98       */
99      private final AtomicLong queuesSize = new AtomicLong();
100 
101     /**
102      * Maximum cumulative writing bytes for one channel among all (as long as channels stay the same)
103      */
104     private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
105 
106     /**
107      * Maximum cumulative read bytes for one channel among all (as long as channels stay the same)
108      */
109     private final AtomicLong cumulativeReadBytes = new AtomicLong();
110 
111     /**
112      * Max size in the list before proposing to stop writing new objects from next handlers
113      * for all channel (global)
114      */
115     long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
116 
117     /**
118      * Limit in B/s to apply to write
119      */
120     private volatile long writeChannelLimit;
121 
122     /**
123      * Limit in B/s to apply to read
124      */
125     private volatile long readChannelLimit;
126 
127     private static final float DEFAULT_DEVIATION = 0.1F;
128     private static final float MAX_DEVIATION = 0.4F;
129     private static final float DEFAULT_SLOWDOWN = 0.4F;
130     private static final float DEFAULT_ACCELERATION = -0.1F;
131     private volatile float maxDeviation;
132     private volatile float accelerationFactor;
133     private volatile float slowDownFactor;
134     private volatile boolean readDeviationActive;
135     private volatile boolean writeDeviationActive;
136 
137     static final class PerChannel {
138         List<ToSend> messagesQueue;
139         TrafficCounter channelTrafficCounter;
140         long queueSize;
141         long lastWriteTimestamp;
142         long lastReadTimestamp;
143     }
144 
145     /**
146      * Create the global TrafficCounter.
147      */
148     void createGlobalTrafficCounter(Timer timer) {
149         // Default
150         setMaxDeviation(DEFAULT_DEVIATION, DEFAULT_SLOWDOWN, DEFAULT_ACCELERATION);
151         if (timer == null) {
152             throw new IllegalArgumentException("Timer must not be null");
153         }
154         TrafficCounter tc = new GlobalChannelTrafficCounter(this, timer, "GlobalChannelTC", checkInterval);
155         setTrafficCounter(tc);
156         tc.start();
157     }
158 
159     @Override
160     int userDefinedWritabilityIndex() {
161         return AbstractTrafficShapingHandler.GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
162     }
163 
164     /**
165      * Create a new instance.
166      *
167      * @param timer
168      *            the {@link Timer} to use for the {@link TrafficCounter}.
169      * @param writeGlobalLimit
170      *            0 or a limit in bytes/s
171      * @param readGlobalLimit
172      *            0 or a limit in bytes/s
173      * @param writeChannelLimit
174      *            0 or a limit in bytes/s
175      * @param readChannelLimit
176      *            0 or a limit in bytes/s
177      * @param checkInterval
178      *            The delay between two computations of performances for
179      *            channels or 0 if no stats are to be computed.
180      * @param maxTime
181      *            The maximum delay to wait in case of traffic excess.
182      */
183     public GlobalChannelTrafficShapingHandler(Timer timer,
184             long writeGlobalLimit, long readGlobalLimit,
185             long writeChannelLimit, long readChannelLimit,
186             long checkInterval, long maxTime) {
187         super(timer, writeGlobalLimit, readGlobalLimit, checkInterval, maxTime);
188         createGlobalTrafficCounter(timer);
189         this.writeChannelLimit = writeChannelLimit;
190         this.readChannelLimit = readChannelLimit;
191     }
192 
193     /**
194      * Create a new instance.
195      *
196      * @param timer
197      *          the {@link Timer} to use for the {@link TrafficCounter}.
198      * @param writeGlobalLimit
199      *            0 or a limit in bytes/s
200      * @param readGlobalLimit
201      *            0 or a limit in bytes/s
202      * @param writeChannelLimit
203      *            0 or a limit in bytes/s
204      * @param readChannelLimit
205      *            0 or a limit in bytes/s
206      * @param checkInterval
207      *          The delay between two computations of performances for
208      *            channels or 0 if no stats are to be computed.
209      */
210     public GlobalChannelTrafficShapingHandler(Timer timer,
211             long writeGlobalLimit, long readGlobalLimit,
212             long writeChannelLimit, long readChannelLimit,
213             long checkInterval) {
214         super(timer, writeGlobalLimit, readGlobalLimit, checkInterval);
215         this.writeChannelLimit = writeChannelLimit;
216         this.readChannelLimit = readChannelLimit;
217         createGlobalTrafficCounter(timer);
218     }
219 
220     /**
221      * Create a new instance.
222      *
223      * @param timer
224      *          the {@link Timer} to use for the {@link TrafficCounter}.
225      * @param writeGlobalLimit
226      *            0 or a limit in bytes/s
227      * @param readGlobalLimit
228      *            0 or a limit in bytes/s
229      * @param writeChannelLimit
230      *            0 or a limit in bytes/s
231      * @param readChannelLimit
232      *            0 or a limit in bytes/s
233      */
234     public GlobalChannelTrafficShapingHandler(Timer timer,
235             long writeGlobalLimit, long readGlobalLimit,
236             long writeChannelLimit, long readChannelLimit) {
237         super(timer, writeGlobalLimit, readGlobalLimit);
238         this.writeChannelLimit = writeChannelLimit;
239         this.readChannelLimit = readChannelLimit;
240         createGlobalTrafficCounter(timer);
241     }
242 
243     /**
244      * Create a new instance.
245      *
246      * @param timer
247      *          the {@link Timer} to use for the {@link TrafficCounter}.
248      * @param checkInterval
249      *          The delay between two computations of performances for
250      *            channels or 0 if no stats are to be computed.
251      */
252     public GlobalChannelTrafficShapingHandler(Timer timer, long checkInterval) {
253         super(timer, checkInterval);
254         createGlobalTrafficCounter(timer);
255     }
256 
257     /**
258      * Create a new instance.
259      *
260      * @param timer
261      *          the {@link Timer} to use for the {@link TrafficCounter}.
262      */
263     public GlobalChannelTrafficShapingHandler(Timer timer) {
264         super(timer);
265         createGlobalTrafficCounter(timer);
266     }
267 
268     /**
269      * @param objectSizeEstimator ObjectSizeEstimator to use
270      * @param timer
271      *            the {@link Timer} to use for the {@link TrafficCounter}.
272      * @param writeLimit write Global Limit
273      *            0 or a limit in bytes/s
274      * @param readLimit read Global Limit
275      *            0 or a limit in bytes/s
276      * @param writeChannelLimit
277      *            0 or a limit in bytes/s
278      * @param readChannelLimit
279      *            0 or a limit in bytes/s
280      * @param checkInterval
281      *            The delay between two computations of performances for
282      *            channels or 0 if no stats are to be computed.
283      * @param maxTime
284      *            The maximum delay to wait in case of traffic excess.
285      */
286     public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer, long writeLimit,
287             long readLimit, long writeChannelLimit, long readChannelLimit, long checkInterval, long maxTime) {
288         super(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, maxTime);
289         this.writeChannelLimit = writeChannelLimit;
290         this.readChannelLimit = readChannelLimit;
291         createGlobalTrafficCounter(timer);
292     }
293 
294     /**
295      * @param objectSizeEstimator ObjectSizeEstimator to use
296      * @param timer
297      *            the {@link Timer} to use for the {@link TrafficCounter}.
298      * @param writeLimit write Global Limit
299      *            0 or a limit in bytes/s
300      * @param readLimit read Global Limit
301      *            0 or a limit in bytes/s
302      * @param writeChannelLimit
303      *            0 or a limit in bytes/s
304      * @param readChannelLimit
305      *            0 or a limit in bytes/s
306      * @param checkInterval
307      *            The delay between two computations of performances for
308      *            channels or 0 if no stats are to be computed.
309      */
310     public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer, long writeLimit,
311             long readLimit, long writeChannelLimit, long readChannelLimit, long checkInterval) {
312         super(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval);
313         this.writeChannelLimit = writeChannelLimit;
314         this.readChannelLimit = readChannelLimit;
315         createGlobalTrafficCounter(timer);
316     }
317 
318     /**
319      * @param objectSizeEstimator ObjectSizeEstimator to use
320      * @param timer
321      *            the {@link Timer} to use for the {@link TrafficCounter}.
322      * @param writeLimit write Global Limit
323      *            0 or a limit in bytes/s
324      * @param readLimit read Global Limit
325      *            0 or a limit in bytes/s
326      * @param writeChannelLimit
327      *            0 or a limit in bytes/s
328      * @param readChannelLimit
329      *            0 or a limit in bytes/s
330      */
331     public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer, long writeLimit,
332             long readLimit, long writeChannelLimit, long readChannelLimit) {
333         super(objectSizeEstimator, timer, writeLimit, readLimit);
334         this.writeChannelLimit = writeChannelLimit;
335         this.readChannelLimit = readChannelLimit;
336         createGlobalTrafficCounter(timer);
337     }
338 
339     /**
340      * @param objectSizeEstimator ObjectSizeEstimator to use
341      * @param timer
342      *            the {@link Timer} to use for the {@link TrafficCounter}.
343      * @param checkInterval
344      *            The delay between two computations of performances for
345      *            channels or 0 if no stats are to be computed.
346      */
347     public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer,
348             long checkInterval) {
349         super(objectSizeEstimator, timer, checkInterval);
350         createGlobalTrafficCounter(timer);
351     }
352 
353     /**
354      * @param objectSizeEstimator ObjectSizeEstimator to use
355      * @param timer
356      *            the {@link Timer} to use for the {@link TrafficCounter}.
357      */
358     public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer) {
359         super(objectSizeEstimator, timer);
360         createGlobalTrafficCounter(timer);
361     }
362 
363     /**
364      * @return the current max deviation.
365      */
366     public float maxDeviation() {
367         return maxDeviation;
368     }
369 
370     /**
371      * @return the current acceleration factor.
372      */
373     public float accelerationFactor() {
374         return accelerationFactor;
375     }
376 
377     /**
378      * @return the current slow down factor.
379      */
380     public float slowDownFactor() {
381         return slowDownFactor;
382     }
383 
384     /**
385      * @param maxDeviation
386      *            the maximum deviation to allow during computation of average, default deviation
387      *            being 0.1, so +/-10% of the desired bandwidth. Maximum being 0.4.
388      * @param slowDownFactor
389      *            the factor set as +x% to the too fast client (minimal value being 0, meaning no
390      *            slow down factor), default being 40% (0.4).
391      * @param accelerationFactor
392      *            the factor set as -x% to the too slow client (maximal value being 0, meaning no
393      *            acceleration factor), default being -10% (-0.1).
394      */
395     public void setMaxDeviation(float maxDeviation, float slowDownFactor, float accelerationFactor) {
396         if (maxDeviation > MAX_DEVIATION) {
397             throw new IllegalArgumentException("maxDeviation must be <= " + MAX_DEVIATION);
398         }
399         if (slowDownFactor < 0) {
400             throw new IllegalArgumentException("slowDownFactor must be >= 0");
401         }
402         if (accelerationFactor > 0) {
403             throw new IllegalArgumentException("accelerationFactor must be <= 0");
404         }
405         this.maxDeviation = maxDeviation;
406         this.accelerationFactor = 1 + accelerationFactor;
407         this.slowDownFactor = 1 + slowDownFactor;
408     }
409 
410     private void computeDeviationCumulativeBytes() {
411         // compute the maximum cumulativeXxxxBytes among still connected Channels
412         long maxWrittenBytes = 0;
413         long maxReadBytes = 0;
414         long minWrittenBytes = Long.MAX_VALUE;
415         long minReadBytes = Long.MAX_VALUE;
416         for (PerChannel perChannel : channelQueues.values()) {
417             long value = perChannel.channelTrafficCounter.getCumulativeWrittenBytes();
418             if (maxWrittenBytes < value) {
419                 maxWrittenBytes = value;
420             }
421             if (minWrittenBytes > value) {
422                 minWrittenBytes = value;
423             }
424             value = perChannel.channelTrafficCounter.getCumulativeReadBytes();
425             if (maxReadBytes < value) {
426                 maxReadBytes = value;
427             }
428             if (minReadBytes > value) {
429                 minReadBytes = value;
430             }
431         }
432         boolean multiple = channelQueues.size() > 1;
433         readDeviationActive = multiple && minReadBytes < maxReadBytes / 2;
434         writeDeviationActive = multiple && minWrittenBytes < maxWrittenBytes / 2;
435         cumulativeWrittenBytes.set(maxWrittenBytes);
436         cumulativeReadBytes.set(maxReadBytes);
437     }
438 
439     @Override
440     protected void doAccounting(TrafficCounter counter) {
441         computeDeviationCumulativeBytes();
442         super.doAccounting(counter);
443     }
444 
445     private long computeBalancedWait(float maxLocal, float maxGlobal, long wait) {
446         if (maxGlobal == 0) {
447             // no change
448             return wait;
449         }
450         float ratio = maxLocal / maxGlobal;
451         // if in the boundaries, same value
452         if (ratio > maxDeviation) {
453             if (ratio < 1 - maxDeviation) {
454                 return wait;
455             } else {
456                 ratio = slowDownFactor;
457                 if (wait < MINIMAL_WAIT) {
458                     wait = MINIMAL_WAIT;
459                 }
460             }
461         } else {
462             ratio = accelerationFactor;
463         }
464         return (long) (wait * ratio);
465     }
466 
467     /**
468      * @return the maxGlobalWriteSize
469      */
470     public long getMaxGlobalWriteSize() {
471         return maxGlobalWriteSize;
472     }
473 
474     /**
475      * Note the change will be taken as best effort, meaning
476      * that all already scheduled traffics will not be
477      * changed, but only applied to new traffics.<br>
478      * So the expected usage of this method is to be used not too often,
479      * accordingly to the traffic shaping configuration.
480      *
481      * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
482      *            globally for all channels before write suspended is set.
483      */
484     public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
485         this.maxGlobalWriteSize = maxGlobalWriteSize;
486     }
487 
488     /**
489      * @return the global size of the buffers for all queues.
490      */
491     public long queuesSize() {
492         return queuesSize.get();
493     }
494 
495     /**
496      * @param newWriteLimit Channel write limit
497      * @param newReadLimit Channel read limit
498      */
499     public void configureChannel(long newWriteLimit, long newReadLimit) {
500         writeChannelLimit = newWriteLimit;
501         readChannelLimit = newReadLimit;
502         long now = TrafficCounter.milliSecondFromNano();
503         for (PerChannel perChannel : channelQueues.values()) {
504             perChannel.channelTrafficCounter.resetAccounting(now);
505         }
506     }
507 
508     /**
509      * @return Channel write limit.
510      */
511     public long getWriteChannelLimit() {
512         return writeChannelLimit;
513     }
514 
515     /**
516      * @param writeLimit Channel write limit
517      */
518     public void setWriteChannelLimit(long writeLimit) {
519         writeChannelLimit = writeLimit;
520         long now = TrafficCounter.milliSecondFromNano();
521         for (PerChannel perChannel : channelQueues.values()) {
522             perChannel.channelTrafficCounter.resetAccounting(now);
523         }
524     }
525 
526     /**
527      * @return Channel read limit.
528      */
529     public long getReadChannelLimit() {
530         return readChannelLimit;
531     }
532 
533     /**
534      * @param readLimit Channel read limit
535      */
536     public void setReadChannelLimit(long readLimit) {
537         readChannelLimit = readLimit;
538         long now = TrafficCounter.milliSecondFromNano();
539         for (PerChannel perChannel : channelQueues.values()) {
540             perChannel.channelTrafficCounter.resetAccounting(now);
541         }
542     }
543 
544     /**
545      * Release all internal resources of this instance.
546      */
547     public final void release() {
548         trafficCounter.stop();
549     }
550 
551     private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
552         // ensure creation is limited to one thread per channel
553         Channel channel = ctx.getChannel();
554         Integer key = channel.hashCode();
555         PerChannel perChannel = channelQueues.get(key);
556         if (perChannel == null) {
557             perChannel = new PerChannel();
558             perChannel.messagesQueue = new LinkedList<ToSend>();
559             // Don't start it since managed through the Global one
560             perChannel.channelTrafficCounter = new TrafficCounter(this, null, "ChannelTC" +
561                     ctx.getChannel().hashCode(), checkInterval);
562             perChannel.queueSize = 0L;
563             perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
564             perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
565             channelQueues.put(key, perChannel);
566         }
567         return perChannel;
568     }
569 
570     @Override
571     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
572         getOrSetPerChannel(ctx);
573         trafficCounter.resetCumulativeTime();
574         super.channelConnected(ctx, e);
575     }
576 
577     @Override
578     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
579         trafficCounter.resetCumulativeTime();
580         Channel channel = ctx.getChannel();
581         Integer key = channel.hashCode();
582         PerChannel perChannel = channelQueues.remove(key);
583         if (perChannel != null) {
584             // write operations need synchronization
585             synchronized (perChannel) {
586                 queuesSize.addAndGet(-perChannel.queueSize);
587                 perChannel.messagesQueue.clear();
588             }
589         }
590         releaseWriteSuspended(ctx);
591         releaseReadSuspended(ctx);
592         super.channelClosed(ctx, e);
593     }
594 
595     @Override
596     public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
597             throws Exception {
598         long now = TrafficCounter.milliSecondFromNano();
599         try {
600             ReadWriteStatus rws = checkAttachment(ctx);
601             long size = calculateSize(evt.getMessage());
602             if (size > 0) {
603                 // compute the number of ms to wait before reopening the channel
604                 // compute the number of ms to wait before reopening the channel
605                 long waitGlobal = trafficCounter.readTimeToWait(size, getReadLimit(), maxTime, now);
606                 Integer key = ctx.getChannel().hashCode();
607                 PerChannel perChannel = channelQueues.get(key);
608                 long wait = 0;
609                 if (perChannel != null) {
610                     wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
611                     if (readDeviationActive) {
612                         // now try to balance between the channels
613                         long maxLocalRead;
614                         maxLocalRead = perChannel.channelTrafficCounter.getCumulativeReadBytes();
615                         long maxGlobalRead = cumulativeReadBytes.get();
616                         if (maxLocalRead <= 0) {
617                             maxLocalRead = 0;
618                         }
619                         if (maxGlobalRead < maxLocalRead) {
620                             maxGlobalRead = maxLocalRead;
621                         }
622                         wait = computeBalancedWait(maxLocalRead, maxGlobalRead, wait);
623                     }
624                 }
625                 if (wait < waitGlobal) {
626                     wait = waitGlobal;
627                 }
628                 wait = checkWaitReadTime(ctx, wait, now);
629                 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
630                     // time in order to try to limit the traffic
631                     if (release.get()) {
632                         return;
633                     }
634                     Channel channel = ctx.getChannel();
635                     if (channel != null && channel.isConnected()) {
636                         // Only AutoRead AND HandlerActive True means Context Active
637                         if (logger.isDebugEnabled()) {
638                             logger.debug("Read suspend: " + wait + ':' + channel.isReadable() + ':' +
639                                     rws.readSuspend);
640                         }
641                         if (timer == null) {
642                             // Sleep since no executor
643                             // logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
644                             Thread.sleep(wait);
645                             return;
646                         }
647                         if (channel.isReadable() && ! rws.readSuspend) {
648                             rws.readSuspend = true;
649                             channel.setReadable(false);
650                             if (logger.isDebugEnabled()) {
651                                 logger.debug("Suspend final status => " + channel.isReadable() + ':' +
652                                         rws.readSuspend);
653                             }
654                             // Create a Runnable to reactive the read if needed. If one was create before
655                             // it will just be reused to limit object creation
656                             if (rws.reopenReadTimerTask == null) {
657                                 rws.reopenReadTimerTask = new ReopenReadTimerTask(ctx);
658                             }
659                             timeout = timer.newTimeout(rws.reopenReadTimerTask, wait,
660                                     TimeUnit.MILLISECONDS);
661                         }
662                     }
663                 }
664             }
665         } finally {
666             informReadOperation(ctx, now);
667             // The message is then forcedly passed to the next handler (not to super)
668             ctx.sendUpstream(evt);
669         }
670     }
671 
672     @Override
673     protected long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
674         Integer key = ctx.getChannel().hashCode();
675         PerChannel perChannel = channelQueues.get(key);
676         if (perChannel != null) {
677             if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
678                 wait = maxTime;
679             }
680         }
681         return wait;
682     }
683 
684     @Override
685     protected void informReadOperation(final ChannelHandlerContext ctx, final long now) {
686         Integer key = ctx.getChannel().hashCode();
687         PerChannel perChannel = channelQueues.get(key);
688         if (perChannel != null) {
689             perChannel.lastReadTimestamp = now;
690         }
691     }
692 
693     private static final class ToSend {
694         final long relativeTimeAction;
695         final MessageEvent toSend;
696         final long size;
697 
698         private ToSend(final long delay, final MessageEvent toSend, final long size) {
699             relativeTimeAction = delay;
700             this.toSend = toSend;
701             this.size = size;
702         }
703     }
704 
705     protected long maximumCumulativeWrittenBytes() {
706         return cumulativeWrittenBytes.get();
707     }
708 
709     protected long maximumCumulativeReadBytes() {
710         return cumulativeReadBytes.get();
711     }
712 
713     /**
714      * To allow for instance doAccounting to use the TrafficCounter per channel.
715      * @return the list of TrafficCounters that exists at the time of the call.
716      */
717     public Collection<TrafficCounter> channelTrafficCounters() {
718         return new AbstractCollection<TrafficCounter>() {
719             @Override
720             public Iterator<TrafficCounter> iterator() {
721                 return new Iterator<TrafficCounter>() {
722                     final Iterator<PerChannel> iter = channelQueues.values().iterator();
723                     public boolean hasNext() {
724                         return iter.hasNext();
725                     }
726                     public TrafficCounter next() {
727                         return iter.next().channelTrafficCounter;
728                     }
729                     public void remove() {
730                         throw new UnsupportedOperationException();
731                     }
732                 };
733             }
734             @Override
735             public int size() {
736                 return channelQueues.size();
737             }
738         };
739     }
740 
741     @Override
742     public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
743             throws Exception {
744         long wait = 0;
745         long size = calculateSize(evt.getMessage());
746         long now = TrafficCounter.milliSecondFromNano();
747         try {
748             if (size > 0) {
749                 // compute the number of ms to wait before continue with the channel
750                 long waitGlobal = trafficCounter.writeTimeToWait(size, getWriteLimit(), maxTime, now);
751                 Integer key = ctx.getChannel().hashCode();
752                 PerChannel perChannel = channelQueues.get(key);
753                 if (perChannel != null) {
754                     wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now);
755                     if (writeDeviationActive) {
756                         // now try to balance between the channels
757                         long maxLocalWrite;
758                         maxLocalWrite = perChannel.channelTrafficCounter.getCumulativeWrittenBytes();
759                         long maxGlobalWrite = cumulativeWrittenBytes.get();
760                         if (maxLocalWrite <= 0) {
761                             maxLocalWrite = 0;
762                         }
763                         if (maxGlobalWrite < maxLocalWrite) {
764                             maxGlobalWrite = maxLocalWrite;
765                         }
766                         wait = computeBalancedWait(maxLocalWrite, maxGlobalWrite, wait);
767                     }
768                 }
769                 if (wait < waitGlobal) {
770                     wait = waitGlobal;
771                 }
772                 if (wait < MINIMAL_WAIT || release.get()) {
773                     wait = 0;
774                 }
775             }
776         } finally {
777             // The message is scheduled
778             submitWrite(ctx, evt, size, wait, now);
779         }
780     }
781 
782     @Override
783     protected void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt,
784             final long size, final long writedelay, final long now) throws Exception {
785         Channel channel = ctx.getChannel();
786         Integer key = channel.hashCode();
787         PerChannel perChannel = channelQueues.get(key);
788         if (perChannel == null) {
789             // in case write occurs before handlerAdded is raized for this handler
790             // imply a synchronized only if needed
791             perChannel = getOrSetPerChannel(ctx);
792         }
793         final ToSend newToSend;
794         long delay = writedelay;
795         boolean globalSizeExceeded = false;
796         // write operations need synchronization
797         synchronized (perChannel) {
798             if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
799                 if (!channel.isConnected()) {
800                     // ignore
801                     return;
802                 }
803                 trafficCounter.bytesRealWriteFlowControl(size);
804                 perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
805                 ctx.sendDownstream(evt);
806                 perChannel.lastWriteTimestamp = now;
807                 return;
808             }
809             if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
810                 delay = maxTime;
811             }
812             if (timer == null) {
813                 // Sleep since no executor
814                 Thread.sleep(delay);
815                 if (!ctx.getChannel().isConnected()) {
816                     // ignore
817                     return;
818                 }
819                 trafficCounter.bytesRealWriteFlowControl(size);
820                 perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
821                 ctx.sendDownstream(evt);
822                 perChannel.lastWriteTimestamp = now;
823                 return;
824             }
825             if (!ctx.getChannel().isConnected()) {
826                 // ignore
827                 return;
828             }
829             newToSend = new ToSend(delay + now, evt, size);
830             perChannel.messagesQueue.add(newToSend);
831             perChannel.queueSize += size;
832             queuesSize.addAndGet(size);
833             checkWriteSuspend(ctx, delay, perChannel.queueSize);
834             if (queuesSize.get() > maxGlobalWriteSize) {
835                 globalSizeExceeded = true;
836             }
837         }
838         if (globalSizeExceeded) {
839             setWritable(ctx, false);
840         }
841         final long futureNow = newToSend.relativeTimeAction;
842         final PerChannel forSchedule = perChannel;
843         timer.newTimeout(new TimerTask() {
844             public void run(Timeout timeout) throws Exception {
845                 sendAllValid(ctx, forSchedule, futureNow);
846             }
847         }, delay, TimeUnit.MILLISECONDS);
848     }
849 
850     private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now)
851             throws Exception {
852         // write operations need synchronization
853         synchronized (perChannel) {
854             while (!perChannel.messagesQueue.isEmpty()) {
855                 ToSend newToSend = perChannel.messagesQueue.remove(0);
856                 if (newToSend.relativeTimeAction <= now) {
857                     if (! ctx.getChannel().isConnected()) {
858                         // ignore
859                         break;
860                     }
861                     long size = newToSend.size;
862                     trafficCounter.bytesRealWriteFlowControl(size);
863                     perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
864                     perChannel.queueSize -= size;
865                     queuesSize.addAndGet(-size);
866                     ctx.sendDownstream(newToSend.toSend);
867                     perChannel.lastWriteTimestamp = now;
868                 } else {
869                     perChannel.messagesQueue.add(0, newToSend);
870                     break;
871                 }
872             }
873             if (perChannel.messagesQueue.isEmpty()) {
874                 releaseWriteSuspended(ctx);
875             }
876         }
877     }
878 
879     @Override
880     public String toString() {
881         return new StringBuilder(340).append(super.toString())
882             .append(" Write Channel Limit: ").append(writeChannelLimit)
883             .append(" Read Channel Limit: ").append(readChannelLimit).toString();
884     }
885 }