View Javadoc

1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.traffic;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelHandler.Sharable;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.util.Attribute;
25  import io.netty.util.concurrent.EventExecutor;
26  import io.netty.util.internal.PlatformDependent;
27  import io.netty.util.internal.logging.InternalLogger;
28  import io.netty.util.internal.logging.InternalLoggerFactory;
29  
30  import java.util.AbstractCollection;
31  import java.util.ArrayDeque;
32  import java.util.Collection;
33  import java.util.Iterator;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.ScheduledExecutorService;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicLong;
38  
39  /**
40   * This implementation of the {@link AbstractTrafficShapingHandler} is for global
41   * and per channel traffic shaping, that is to say a global limitation of the bandwidth, whatever
42   * the number of opened channels and a per channel limitation of the bandwidth.<br><br>
43   * This version shall not be in the same pipeline than other TrafficShapingHandler.<br><br>
44   *
45   * The general use should be as follow:<br>
46   * <ul>
47   * <li>Create your unique GlobalChannelTrafficShapingHandler like:<br><br>
48   * <tt>GlobalChannelTrafficShapingHandler myHandler = new GlobalChannelTrafficShapingHandler(executor);</tt><br><br>
49   * The executor could be the underlying IO worker pool<br>
50   * <tt>pipeline.addLast(myHandler);</tt><br><br>
51   *
52   * <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
53   * and shared among all channels as the counter must be shared among all channels.</b><br><br>
54   *
55   * Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
56   * or the check interval (in millisecond) that represents the delay between two computations of the
57   * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br>
58   * Note that as this is a fusion of both Global and Channel Traffic Shaping, limits are in 2 sets,
59   * respectively Global and Channel.<br><br>
60   *
61   * A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
62   * it is recommended to set a positive value, even if it is high since the precision of the
63   * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
64   * the less precise the traffic shaping will be. It is suggested as higher value something close
65   * to 5 or 10 minutes.<br><br>
66   *
67   * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br>
68   * </li>
69   * <li>In your handler, you should consider to use the {@code channel.isWritable()} and
70   * {@code channelWritabilityChanged(ctx)} to handle writability, or through
71   * {@code future.addListener(new GenericFutureListener())} on the future returned by
72   * {@code ctx.write()}.</li>
73   * <li>You shall also consider to have object size in read or write operations relatively adapted to
74   * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
75   * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.<br><br></li>
76   * <li>Some configuration methods will be taken as best effort, meaning
77   * that all already scheduled traffics will not be
78   * changed, but only applied to new traffics.<br>
79   * So the expected usage of those methods are to be used not too often,
80   * accordingly to the traffic shaping configuration.</li>
81   * </ul><br>
82   *
83   * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources.
84   * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own.
85   */
86  @Sharable
87  public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
88      private static final InternalLogger logger =
89              InternalLoggerFactory.getInstance(GlobalChannelTrafficShapingHandler.class);
90      /**
91       * All queues per channel
92       */
93      final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
94  
95      /**
96       * Global queues size
97       */
98      private final AtomicLong queuesSize = new AtomicLong();
99  
100     /**
101      * Maximum cumulative writing bytes for one channel among all (as long as channels stay the same)
102      */
103     private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
104 
105     /**
106      * Maximum cumulative read bytes for one channel among all (as long as channels stay the same)
107      */
108     private final AtomicLong cumulativeReadBytes = new AtomicLong();
109 
110     /**
111      * Max size in the list before proposing to stop writing new objects from next handlers
112      * for all channel (global)
113      */
114     volatile long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
115 
116     /**
117      * Limit in B/s to apply to write
118      */
119     private volatile long writeChannelLimit;
120 
121     /**
122      * Limit in B/s to apply to read
123      */
124     private volatile long readChannelLimit;
125 
126     private static final float DEFAULT_DEVIATION = 0.1F;
127     private static final float MAX_DEVIATION = 0.4F;
128     private static final float DEFAULT_SLOWDOWN = 0.4F;
129     private static final float DEFAULT_ACCELERATION = -0.1F;
130     private volatile float maxDeviation;
131     private volatile float accelerationFactor;
132     private volatile float slowDownFactor;
133     private volatile boolean readDeviationActive;
134     private volatile boolean writeDeviationActive;
135 
136     static final class PerChannel {
137         ArrayDeque<ToSend> messagesQueue;
138         TrafficCounter channelTrafficCounter;
139         long queueSize;
140         long lastWriteTimestamp;
141         long lastReadTimestamp;
142     }
143 
144     /**
145      * Create the global TrafficCounter
146      */
147     void createGlobalTrafficCounter(ScheduledExecutorService executor) {
148         // Default
149         setMaxDeviation(DEFAULT_DEVIATION, DEFAULT_SLOWDOWN, DEFAULT_ACCELERATION);
150         if (executor == null) {
151             throw new IllegalArgumentException("Executor must not be null");
152         }
153         TrafficCounter tc = new GlobalChannelTrafficCounter(this, executor, "GlobalChannelTC", checkInterval);
154         setTrafficCounter(tc);
155         tc.start();
156     }
157 
158     @Override
159     protected int userDefinedWritabilityIndex() {
160         return AbstractTrafficShapingHandler.GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
161     }
162 
163     /**
164      * Create a new instance.
165      *
166      * @param executor
167      *            the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
168      * @param writeGlobalLimit
169      *            0 or a limit in bytes/s
170      * @param readGlobalLimit
171      *            0 or a limit in bytes/s
172      * @param writeChannelLimit
173      *            0 or a limit in bytes/s
174      * @param readChannelLimit
175      *            0 or a limit in bytes/s
176      * @param checkInterval
177      *            The delay between two computations of performances for
178      *            channels or 0 if no stats are to be computed.
179      * @param maxTime
180      *            The maximum delay to wait in case of traffic excess.
181      */
182     public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
183             long writeGlobalLimit, long readGlobalLimit,
184             long writeChannelLimit, long readChannelLimit,
185             long checkInterval, long maxTime) {
186         super(writeGlobalLimit, readGlobalLimit, checkInterval, maxTime);
187         createGlobalTrafficCounter(executor);
188         this.writeChannelLimit = writeChannelLimit;
189         this.readChannelLimit = readChannelLimit;
190     }
191 
192     /**
193      * Create a new instance.
194      *
195      * @param executor
196      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
197      * @param writeGlobalLimit
198      *            0 or a limit in bytes/s
199      * @param readGlobalLimit
200      *            0 or a limit in bytes/s
201      * @param writeChannelLimit
202      *            0 or a limit in bytes/s
203      * @param readChannelLimit
204      *            0 or a limit in bytes/s
205      * @param checkInterval
206      *          The delay between two computations of performances for
207      *            channels or 0 if no stats are to be computed.
208      */
209     public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
210             long writeGlobalLimit, long readGlobalLimit,
211             long writeChannelLimit, long readChannelLimit,
212             long checkInterval) {
213         super(writeGlobalLimit, readGlobalLimit, checkInterval);
214         this.writeChannelLimit = writeChannelLimit;
215         this.readChannelLimit = readChannelLimit;
216         createGlobalTrafficCounter(executor);
217     }
218 
219     /**
220      * Create a new instance.
221      *
222      * @param executor
223      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
224      * @param writeGlobalLimit
225      *            0 or a limit in bytes/s
226      * @param readGlobalLimit
227      *            0 or a limit in bytes/s
228      * @param writeChannelLimit
229      *            0 or a limit in bytes/s
230      * @param readChannelLimit
231      *            0 or a limit in bytes/s
232      */
233     public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
234             long writeGlobalLimit, long readGlobalLimit,
235             long writeChannelLimit, long readChannelLimit) {
236         super(writeGlobalLimit, readGlobalLimit);
237         this.writeChannelLimit = writeChannelLimit;
238         this.readChannelLimit = readChannelLimit;
239         createGlobalTrafficCounter(executor);
240     }
241 
242     /**
243      * Create a new instance.
244      *
245      * @param executor
246      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
247      * @param checkInterval
248      *          The delay between two computations of performances for
249      *            channels or 0 if no stats are to be computed.
250      */
251     public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) {
252         super(checkInterval);
253         createGlobalTrafficCounter(executor);
254     }
255 
256     /**
257      * Create a new instance.
258      *
259      * @param executor
260      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
261      */
262     public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor) {
263         createGlobalTrafficCounter(executor);
264     }
265 
266     /**
267      * @return the current max deviation
268      */
269     public float maxDeviation() {
270         return maxDeviation;
271     }
272 
273     /**
274      * @return the current acceleration factor
275      */
276     public float accelerationFactor() {
277         return accelerationFactor;
278     }
279 
280     /**
281      * @return the current slow down factor
282      */
283     public float slowDownFactor() {
284         return slowDownFactor;
285     }
286 
287     /**
288      * @param maxDeviation
289      *            the maximum deviation to allow during computation of average, default deviation
290      *            being 0.1, so +/-10% of the desired bandwidth. Maximum being 0.4.
291      * @param slowDownFactor
292      *            the factor set as +x% to the too fast client (minimal value being 0, meaning no
293      *            slow down factor), default being 40% (0.4).
294      * @param accelerationFactor
295      *            the factor set as -x% to the too slow client (maximal value being 0, meaning no
296      *            acceleration factor), default being -10% (-0.1).
297      */
298     public void setMaxDeviation(float maxDeviation, float slowDownFactor, float accelerationFactor) {
299         if (maxDeviation > MAX_DEVIATION) {
300             throw new IllegalArgumentException("maxDeviation must be <= " + MAX_DEVIATION);
301         }
302         if (slowDownFactor < 0) {
303             throw new IllegalArgumentException("slowDownFactor must be >= 0");
304         }
305         if (accelerationFactor > 0) {
306             throw new IllegalArgumentException("accelerationFactor must be <= 0");
307         }
308         this.maxDeviation = maxDeviation;
309         this.accelerationFactor = 1 + accelerationFactor;
310         this.slowDownFactor = 1 + slowDownFactor;
311     }
312 
313     private void computeDeviationCumulativeBytes() {
314         // compute the maximum cumulativeXxxxBytes among still connected Channels
315         long maxWrittenBytes = 0;
316         long maxReadBytes = 0;
317         long minWrittenBytes = Long.MAX_VALUE;
318         long minReadBytes = Long.MAX_VALUE;
319         for (PerChannel perChannel : channelQueues.values()) {
320             long value = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
321             if (maxWrittenBytes < value) {
322                 maxWrittenBytes = value;
323             }
324             if (minWrittenBytes > value) {
325                 minWrittenBytes = value;
326             }
327             value = perChannel.channelTrafficCounter.cumulativeReadBytes();
328             if (maxReadBytes < value) {
329                 maxReadBytes = value;
330             }
331             if (minReadBytes > value) {
332                 minReadBytes = value;
333             }
334         }
335         boolean multiple = channelQueues.size() > 1;
336         readDeviationActive = multiple && minReadBytes < maxReadBytes / 2;
337         writeDeviationActive = multiple && minWrittenBytes < maxWrittenBytes / 2;
338         cumulativeWrittenBytes.set(maxWrittenBytes);
339         cumulativeReadBytes.set(maxReadBytes);
340     }
341 
342     @Override
343     protected void doAccounting(TrafficCounter counter) {
344         computeDeviationCumulativeBytes();
345         super.doAccounting(counter);
346     }
347 
348     private long computeBalancedWait(float maxLocal, float maxGlobal, long wait) {
349         if (maxGlobal == 0) {
350             // no change
351             return wait;
352         }
353         float ratio = maxLocal / maxGlobal;
354         // if in the boundaries, same value
355         if (ratio > maxDeviation) {
356             if (ratio < 1 - maxDeviation) {
357                 return wait;
358             } else {
359                 ratio = slowDownFactor;
360                 if (wait < MINIMAL_WAIT) {
361                     wait = MINIMAL_WAIT;
362                 }
363             }
364         } else {
365             ratio = accelerationFactor;
366         }
367         return (long) (wait * ratio);
368     }
369 
370     /**
371      * @return the maxGlobalWriteSize
372      */
373     public long getMaxGlobalWriteSize() {
374         return maxGlobalWriteSize;
375     }
376 
377     /**
378      * Note the change will be taken as best effort, meaning
379      * that all already scheduled traffics will not be
380      * changed, but only applied to new traffics.<br>
381      * So the expected usage of this method is to be used not too often,
382      * accordingly to the traffic shaping configuration.
383      *
384      * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
385      *            globally for all channels before write suspended is set.
386      */
387     public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
388         if (maxGlobalWriteSize <= 0) {
389             throw new IllegalArgumentException("maxGlobalWriteSize must be positive");
390         }
391         this.maxGlobalWriteSize = maxGlobalWriteSize;
392     }
393 
394     /**
395      * @return the global size of the buffers for all queues.
396      */
397     public long queuesSize() {
398         return queuesSize.get();
399     }
400 
401     /**
402      * @param newWriteLimit Channel write limit
403      * @param newReadLimit Channel read limit
404      */
405     public void configureChannel(long newWriteLimit, long newReadLimit) {
406         writeChannelLimit = newWriteLimit;
407         readChannelLimit = newReadLimit;
408         long now = TrafficCounter.milliSecondFromNano();
409         for (PerChannel perChannel : channelQueues.values()) {
410             perChannel.channelTrafficCounter.resetAccounting(now);
411         }
412     }
413 
414     /**
415      * @return Channel write limit
416      */
417     public long getWriteChannelLimit() {
418         return writeChannelLimit;
419     }
420 
421     /**
422      * @param writeLimit Channel write limit
423      */
424     public void setWriteChannelLimit(long writeLimit) {
425         writeChannelLimit = writeLimit;
426         long now = TrafficCounter.milliSecondFromNano();
427         for (PerChannel perChannel : channelQueues.values()) {
428             perChannel.channelTrafficCounter.resetAccounting(now);
429         }
430     }
431 
432     /**
433      * @return Channel read limit
434      */
435     public long getReadChannelLimit() {
436         return readChannelLimit;
437     }
438 
439     /**
440      * @param readLimit Channel read limit
441      */
442     public void setReadChannelLimit(long readLimit) {
443         readChannelLimit = readLimit;
444         long now = TrafficCounter.milliSecondFromNano();
445         for (PerChannel perChannel : channelQueues.values()) {
446             perChannel.channelTrafficCounter.resetAccounting(now);
447         }
448     }
449 
450     /**
451      * Release all internal resources of this instance.
452      */
453     public final void release() {
454         trafficCounter.stop();
455     }
456 
457     private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
458         // ensure creation is limited to one thread per channel
459         Channel channel = ctx.channel();
460         Integer key = channel.hashCode();
461         PerChannel perChannel = channelQueues.get(key);
462         if (perChannel == null) {
463             perChannel = new PerChannel();
464             perChannel.messagesQueue = new ArrayDeque<ToSend>();
465             // Don't start it since managed through the Global one
466             perChannel.channelTrafficCounter = new TrafficCounter(this, null, "ChannelTC" +
467                     ctx.channel().hashCode(), checkInterval);
468             perChannel.queueSize = 0L;
469             perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
470             perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
471             channelQueues.put(key, perChannel);
472         }
473         return perChannel;
474     }
475 
476     @Override
477     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
478         getOrSetPerChannel(ctx);
479         trafficCounter.resetCumulativeTime();
480         super.handlerAdded(ctx);
481     }
482 
483     @Override
484     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
485         trafficCounter.resetCumulativeTime();
486         Channel channel = ctx.channel();
487         Integer key = channel.hashCode();
488         PerChannel perChannel = channelQueues.remove(key);
489         if (perChannel != null) {
490             // write operations need synchronization
491             synchronized (perChannel) {
492                 if (channel.isActive()) {
493                     for (ToSend toSend : perChannel.messagesQueue) {
494                         long size = calculateSize(toSend.toSend);
495                         trafficCounter.bytesRealWriteFlowControl(size);
496                         perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
497                         perChannel.queueSize -= size;
498                         queuesSize.addAndGet(-size);
499                         ctx.write(toSend.toSend, toSend.promise);
500                     }
501                 } else {
502                     queuesSize.addAndGet(-perChannel.queueSize);
503                     for (ToSend toSend : perChannel.messagesQueue) {
504                         if (toSend.toSend instanceof ByteBuf) {
505                             ((ByteBuf) toSend.toSend).release();
506                         }
507                     }
508                 }
509                 perChannel.messagesQueue.clear();
510             }
511         }
512         releaseWriteSuspended(ctx);
513         releaseReadSuspended(ctx);
514         super.handlerRemoved(ctx);
515     }
516 
517     @Override
518     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
519         long size = calculateSize(msg);
520         long now = TrafficCounter.milliSecondFromNano();
521         if (size > 0) {
522             // compute the number of ms to wait before reopening the channel
523             long waitGlobal = trafficCounter.readTimeToWait(size, getReadLimit(), maxTime, now);
524             Integer key = ctx.channel().hashCode();
525             PerChannel perChannel = channelQueues.get(key);
526             long wait = 0;
527             if (perChannel != null) {
528                 wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
529                 if (readDeviationActive) {
530                     // now try to balance between the channels
531                     long maxLocalRead;
532                     maxLocalRead = perChannel.channelTrafficCounter.cumulativeReadBytes();
533                     long maxGlobalRead = cumulativeReadBytes.get();
534                     if (maxLocalRead <= 0) {
535                         maxLocalRead = 0;
536                     }
537                     if (maxGlobalRead < maxLocalRead) {
538                         maxGlobalRead = maxLocalRead;
539                     }
540                     wait = computeBalancedWait(maxLocalRead, maxGlobalRead, wait);
541                 }
542             }
543             if (wait < waitGlobal) {
544                 wait = waitGlobal;
545             }
546             wait = checkWaitReadTime(ctx, wait, now);
547             if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
548                 // time in order to try to limit the traffic
549                 // Only AutoRead AND HandlerActive True means Context Active
550                 ChannelConfig config = ctx.channel().config();
551                 if (logger.isDebugEnabled()) {
552                     logger.debug("Read Suspend: " + wait + ':' + config.isAutoRead() + ':'
553                             + isHandlerActive(ctx));
554                 }
555                 if (config.isAutoRead() && isHandlerActive(ctx)) {
556                     config.setAutoRead(false);
557                     ctx.attr(READ_SUSPENDED).set(true);
558                     // Create a Runnable to reactive the read if needed. If one was create before it will just be
559                     // reused to limit object creation
560                     Attribute<Runnable> attr = ctx.attr(REOPEN_TASK);
561                     Runnable reopenTask = attr.get();
562                     if (reopenTask == null) {
563                         reopenTask = new ReopenReadTimerTask(ctx);
564                         attr.set(reopenTask);
565                     }
566                     ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
567                     if (logger.isDebugEnabled()) {
568                         logger.debug("Suspend final status => " + config.isAutoRead() + ':'
569                                 + isHandlerActive(ctx) + " will reopened at: " + wait);
570                     }
571                 }
572             }
573         }
574         informReadOperation(ctx, now);
575         ctx.fireChannelRead(msg);
576     }
577 
578     @Override
579     protected long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
580         Integer key = ctx.channel().hashCode();
581         PerChannel perChannel = channelQueues.get(key);
582         if (perChannel != null) {
583             if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
584                 wait = maxTime;
585             }
586         }
587         return wait;
588     }
589 
590     @Override
591     protected void informReadOperation(final ChannelHandlerContext ctx, final long now) {
592         Integer key = ctx.channel().hashCode();
593         PerChannel perChannel = channelQueues.get(key);
594         if (perChannel != null) {
595             perChannel.lastReadTimestamp = now;
596         }
597     }
598 
599     private static final class ToSend {
600         final long relativeTimeAction;
601         final Object toSend;
602         final ChannelPromise promise;
603         final long size;
604 
605         private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
606             relativeTimeAction = delay;
607             this.toSend = toSend;
608             this.size = size;
609             this.promise = promise;
610         }
611     }
612 
613     protected long maximumCumulativeWrittenBytes() {
614         return cumulativeWrittenBytes.get();
615     }
616 
617     protected long maximumCumulativeReadBytes() {
618         return cumulativeReadBytes.get();
619     }
620 
621     /**
622      * To allow for instance doAccounting to use the TrafficCounter per channel.
623      * @return the list of TrafficCounters that exists at the time of the call.
624      */
625     public Collection<TrafficCounter> channelTrafficCounters() {
626         return new AbstractCollection<TrafficCounter>() {
627             @Override
628             public Iterator<TrafficCounter> iterator() {
629                 return new Iterator<TrafficCounter>() {
630                     final Iterator<PerChannel> iter = channelQueues.values().iterator();
631                     @Override
632                     public boolean hasNext() {
633                         return iter.hasNext();
634                     }
635                     @Override
636                     public TrafficCounter next() {
637                         return iter.next().channelTrafficCounter;
638                     }
639                     @Override
640                     public void remove() {
641                         throw new UnsupportedOperationException();
642                     }
643                 };
644             }
645             @Override
646             public int size() {
647                 return channelQueues.size();
648             }
649         };
650     }
651 
652     @Override
653     public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
654             throws Exception {
655         long size = calculateSize(msg);
656         long now = TrafficCounter.milliSecondFromNano();
657         if (size > 0) {
658             // compute the number of ms to wait before continue with the channel
659             long waitGlobal = trafficCounter.writeTimeToWait(size, getWriteLimit(), maxTime, now);
660             Integer key = ctx.channel().hashCode();
661             PerChannel perChannel = channelQueues.get(key);
662             long wait = 0;
663             if (perChannel != null) {
664                 wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now);
665                 if (writeDeviationActive) {
666                     // now try to balance between the channels
667                     long maxLocalWrite;
668                     maxLocalWrite = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
669                     long maxGlobalWrite = cumulativeWrittenBytes.get();
670                     if (maxLocalWrite <= 0) {
671                         maxLocalWrite = 0;
672                     }
673                     if (maxGlobalWrite < maxLocalWrite) {
674                         maxGlobalWrite = maxLocalWrite;
675                     }
676                     wait = computeBalancedWait(maxLocalWrite, maxGlobalWrite, wait);
677                 }
678             }
679             if (wait < waitGlobal) {
680                 wait = waitGlobal;
681             }
682             if (wait >= MINIMAL_WAIT) {
683                 if (logger.isDebugEnabled()) {
684                     logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
685                             + isHandlerActive(ctx));
686                 }
687                 submitWrite(ctx, msg, size, wait, now, promise);
688                 return;
689             }
690         }
691         // to maintain order of write
692         submitWrite(ctx, msg, size, 0, now, promise);
693     }
694 
695     @Override
696     protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
697             final long size, final long writedelay, final long now,
698             final ChannelPromise promise) {
699         Channel channel = ctx.channel();
700         Integer key = channel.hashCode();
701         PerChannel perChannel = channelQueues.get(key);
702         if (perChannel == null) {
703             // in case write occurs before handlerAdded is raised for this handler
704             // imply a synchronized only if needed
705             perChannel = getOrSetPerChannel(ctx);
706         }
707         final ToSend newToSend;
708         long delay = writedelay;
709         boolean globalSizeExceeded = false;
710         // write operations need synchronization
711         synchronized (perChannel) {
712             if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
713                 trafficCounter.bytesRealWriteFlowControl(size);
714                 perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
715                 ctx.write(msg, promise);
716                 perChannel.lastWriteTimestamp = now;
717                 return;
718             }
719             if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
720                 delay = maxTime;
721             }
722             newToSend = new ToSend(delay + now, msg, size, promise);
723             perChannel.messagesQueue.addLast(newToSend);
724             perChannel.queueSize += size;
725             queuesSize.addAndGet(size);
726             checkWriteSuspend(ctx, delay, perChannel.queueSize);
727             if (queuesSize.get() > maxGlobalWriteSize) {
728                 globalSizeExceeded = true;
729             }
730         }
731         if (globalSizeExceeded) {
732             setUserDefinedWritability(ctx, false);
733         }
734         final long futureNow = newToSend.relativeTimeAction;
735         final PerChannel forSchedule = perChannel;
736         ctx.executor().schedule(new Runnable() {
737             @Override
738             public void run() {
739                 sendAllValid(ctx, forSchedule, futureNow);
740             }
741         }, delay, TimeUnit.MILLISECONDS);
742     }
743 
744     private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
745         // write operations need synchronization
746         synchronized (perChannel) {
747             ToSend newToSend = perChannel.messagesQueue.pollFirst();
748             for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
749                 if (newToSend.relativeTimeAction <= now) {
750                     long size = newToSend.size;
751                     trafficCounter.bytesRealWriteFlowControl(size);
752                     perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
753                     perChannel.queueSize -= size;
754                     queuesSize.addAndGet(-size);
755                     ctx.write(newToSend.toSend, newToSend.promise);
756                     perChannel.lastWriteTimestamp = now;
757                 } else {
758                     perChannel.messagesQueue.addFirst(newToSend);
759                     break;
760                 }
761             }
762             if (perChannel.messagesQueue.isEmpty()) {
763                 releaseWriteSuspended(ctx);
764             }
765         }
766         ctx.flush();
767     }
768 
769     @Override
770     public String toString() {
771         return new StringBuilder(340).append(super.toString())
772             .append(" Write Channel Limit: ").append(writeChannelLimit)
773             .append(" Read Channel Limit: ").append(readChannelLimit).toString();
774     }
775 }