View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.traffic;
17  
18  import io.netty5.channel.Channel;
19  import io.netty5.channel.ChannelHandlerContext;
20  import io.netty5.channel.ChannelOption;
21  import io.netty5.util.Attribute;
22  import io.netty5.util.Resource;
23  import io.netty5.util.concurrent.EventExecutor;
24  import io.netty5.util.concurrent.EventExecutorGroup;
25  import io.netty5.util.concurrent.Future;
26  import io.netty5.util.concurrent.Promise;
27  import io.netty5.util.internal.logging.InternalLogger;
28  import io.netty5.util.internal.logging.InternalLoggerFactory;
29  
30  import java.util.AbstractCollection;
31  import java.util.ArrayDeque;
32  import java.util.Collection;
33  import java.util.Iterator;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ConcurrentMap;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicLong;
38  
39  import static io.netty5.util.internal.ObjectUtil.checkNotNullWithIAE;
40  import static io.netty5.util.internal.ObjectUtil.checkPositive;
41  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
42  
43  /**
44   * This implementation of the {@link AbstractTrafficShapingHandler} is for global
45   * and per channel traffic shaping, that is to say a global limitation of the bandwidth, whatever
46   * the number of opened channels and a per channel limitation of the bandwidth.<br><br>
47   * This version shall not be in the same pipeline than other TrafficShapingHandler.<br><br>
48   *
49   * The general use should be as follow:<br>
50   * <ul>
51   * <li>Create your unique GlobalChannelTrafficShapingHandler like:<br><br>
52   * <tt>GlobalChannelTrafficShapingHandler myHandler = new GlobalChannelTrafficShapingHandler(executor);</tt><br><br>
53   * The executor could be the underlying IO worker pool<br>
54   * <tt>pipeline.addLast(myHandler);</tt><br><br>
55   *
56   * <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
57   * and shared among all channels as the counter must be shared among all channels.</b><br><br>
58   *
59   * Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
60   * or the check interval (in millisecond) that represents the delay between two computations of the
61   * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br>
62   * Note that as this is a fusion of both Global and Channel Traffic Shaping, limits are in 2 sets,
63   * respectively Global and Channel.<br><br>
64   *
65   * A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
66   * it is recommended to set a positive value, even if it is high since the precision of the
67   * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
68   * the less precise the traffic shaping will be. It is suggested as higher value something close
69   * to 5 or 10 minutes.<br><br>
70   *
71   * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br>
72   * </li>
73   * <li>In your handler, you should consider to use the {@code channel.isWritable()} and
74   * {@code channelWritabilityChanged(ctx)} to handle writability, or through
75   * {@code future.addListener(future -> ...)} on the future returned by
76   * {@code ctx.write()}.</li>
77   * <li>You shall also consider to have object size in read or write operations relatively adapted to
78   * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
79   * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.<br><br></li>
80   * <li>Some configuration methods will be taken as best effort, meaning
81   * that all already scheduled traffics will not be
82   * changed, but only applied to new traffics.<br>
83   * So the expected usage of those methods are to be used not too often,
84   * accordingly to the traffic shaping configuration.</li>
85   * </ul><br>
86   *
87   * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources.
88   * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own.
89   */
90  public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
91      private static final InternalLogger logger =
92              InternalLoggerFactory.getInstance(GlobalChannelTrafficShapingHandler.class);
93      /**
94       * All queues per channel
95       */
96      final ConcurrentMap<Integer, PerChannel> channelQueues = new ConcurrentHashMap<>();
97  
98      /**
99       * Global queues size
100      */
101     private final AtomicLong queuesSize = new AtomicLong();
102 
103     /**
104      * Maximum cumulative writing bytes for one channel among all (as long as channels stay the same)
105      */
106     private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
107 
108     /**
109      * Maximum cumulative read bytes for one channel among all (as long as channels stay the same)
110      */
111     private final AtomicLong cumulativeReadBytes = new AtomicLong();
112 
113     /**
114      * Max size in the list before proposing to stop writing new objects from next handlers
115      * for all channel (global)
116      */
117     volatile long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
118 
119     /**
120      * Limit in B/s to apply to write
121      */
122     private volatile long writeChannelLimit;
123 
124     /**
125      * Limit in B/s to apply to read
126      */
127     private volatile long readChannelLimit;
128 
129     private static final float DEFAULT_DEVIATION = 0.1F;
130     private static final float MAX_DEVIATION = 0.4F;
131     private static final float DEFAULT_SLOWDOWN = 0.4F;
132     private static final float DEFAULT_ACCELERATION = -0.1F;
133     private volatile float maxDeviation;
134     private volatile float accelerationFactor;
135     private volatile float slowDownFactor;
136     private volatile boolean readDeviationActive;
137     private volatile boolean writeDeviationActive;
138 
139     static final class PerChannel {
140         ArrayDeque<ToSend> messagesQueue;
141         TrafficCounter channelTrafficCounter;
142         long queueSize;
143         long lastWriteTimestamp;
144         long lastReadTimestamp;
145     }
146 
147     /**
148      * Create the global TrafficCounter
149      */
150     void createGlobalTrafficCounter(EventExecutorGroup executor) {
151         // Default
152         setMaxDeviation(DEFAULT_DEVIATION, DEFAULT_SLOWDOWN, DEFAULT_ACCELERATION);
153         checkNotNullWithIAE(executor, "executor");
154         TrafficCounter tc = new GlobalChannelTrafficCounter(this, executor, "GlobalChannelTC", checkInterval);
155         setTrafficCounter(tc);
156         tc.start();
157     }
158 
159     @Override
160     protected int userDefinedWritabilityIndex() {
161         return AbstractTrafficShapingHandler.GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
162     }
163 
164     /**
165      * Create a new instance.
166      *
167      * @param executor
168      *            the {@link EventExecutorGroup} to use for the {@link TrafficCounter}.
169      * @param writeGlobalLimit
170      *            0 or a limit in bytes/s
171      * @param readGlobalLimit
172      *            0 or a limit in bytes/s
173      * @param writeChannelLimit
174      *            0 or a limit in bytes/s
175      * @param readChannelLimit
176      *            0 or a limit in bytes/s
177      * @param checkInterval
178      *            The delay between two computations of performances for
179      *            channels or 0 if no stats are to be computed.
180      * @param maxTime
181      *            The maximum delay to wait in case of traffic excess.
182      */
183     public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor,
184             long writeGlobalLimit, long readGlobalLimit,
185             long writeChannelLimit, long readChannelLimit,
186             long checkInterval, long maxTime) {
187         super(writeGlobalLimit, readGlobalLimit, checkInterval, maxTime);
188         createGlobalTrafficCounter(executor);
189         this.writeChannelLimit = writeChannelLimit;
190         this.readChannelLimit = readChannelLimit;
191     }
192 
193     /**
194      * Create a new instance.
195      *
196      * @param executor
197      *          the {@link EventExecutorGroup} to use for the {@link TrafficCounter}.
198      * @param writeGlobalLimit
199      *            0 or a limit in bytes/s
200      * @param readGlobalLimit
201      *            0 or a limit in bytes/s
202      * @param writeChannelLimit
203      *            0 or a limit in bytes/s
204      * @param readChannelLimit
205      *            0 or a limit in bytes/s
206      * @param checkInterval
207      *          The delay between two computations of performances for
208      *            channels or 0 if no stats are to be computed.
209      */
210     public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor,
211             long writeGlobalLimit, long readGlobalLimit,
212             long writeChannelLimit, long readChannelLimit,
213             long checkInterval) {
214         super(writeGlobalLimit, readGlobalLimit, checkInterval);
215         this.writeChannelLimit = writeChannelLimit;
216         this.readChannelLimit = readChannelLimit;
217         createGlobalTrafficCounter(executor);
218     }
219 
220     /**
221      * Create a new instance.
222      *
223      * @param executor
224      *          the {@link EventExecutorGroup} to use for the {@link TrafficCounter}.
225      * @param writeGlobalLimit
226      *            0 or a limit in bytes/s
227      * @param readGlobalLimit
228      *            0 or a limit in bytes/s
229      * @param writeChannelLimit
230      *            0 or a limit in bytes/s
231      * @param readChannelLimit
232      *            0 or a limit in bytes/s
233      */
234     public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor,
235             long writeGlobalLimit, long readGlobalLimit,
236             long writeChannelLimit, long readChannelLimit) {
237         super(writeGlobalLimit, readGlobalLimit);
238         this.writeChannelLimit = writeChannelLimit;
239         this.readChannelLimit = readChannelLimit;
240         createGlobalTrafficCounter(executor);
241     }
242 
243     /**
244      * Create a new instance.
245      *
246      * @param executor
247      *          the {@link EventExecutorGroup} to use for the {@link TrafficCounter}.
248      * @param checkInterval
249      *          The delay between two computations of performances for
250      *            channels or 0 if no stats are to be computed.
251      */
252     public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor, long checkInterval) {
253         super(checkInterval);
254         createGlobalTrafficCounter(executor);
255     }
256 
257     /**
258      * Create a new instance.
259      *
260      * @param executor
261      *          the {@link EventExecutorGroup} to use for the {@link TrafficCounter}.
262      */
263     public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor) {
264         createGlobalTrafficCounter(executor);
265     }
266 
267     @Override
268     public boolean isSharable() {
269         return true;
270     }
271 
272     /**
273      * @return the current max deviation
274      */
275     public float maxDeviation() {
276         return maxDeviation;
277     }
278 
279     /**
280      * @return the current acceleration factor
281      */
282     public float accelerationFactor() {
283         return accelerationFactor;
284     }
285 
286     /**
287      * @return the current slow down factor
288      */
289     public float slowDownFactor() {
290         return slowDownFactor;
291     }
292 
293     /**
294      * @param maxDeviation
295      *            the maximum deviation to allow during computation of average, default deviation
296      *            being 0.1, so +/-10% of the desired bandwidth. Maximum being 0.4.
297      * @param slowDownFactor
298      *            the factor set as +x% to the too fast client (minimal value being 0, meaning no
299      *            slow down factor), default being 40% (0.4).
300      * @param accelerationFactor
301      *            the factor set as -x% to the too slow client (maximal value being 0, meaning no
302      *            acceleration factor), default being -10% (-0.1).
303      */
304     public void setMaxDeviation(float maxDeviation, float slowDownFactor, float accelerationFactor) {
305         if (maxDeviation > MAX_DEVIATION) {
306             throw new IllegalArgumentException("maxDeviation must be <= " + MAX_DEVIATION);
307         }
308         checkPositiveOrZero(slowDownFactor, "slowDownFactor");
309         if (accelerationFactor > 0) {
310             throw new IllegalArgumentException("accelerationFactor must be <= 0");
311         }
312         this.maxDeviation = maxDeviation;
313         this.accelerationFactor = 1 + accelerationFactor;
314         this.slowDownFactor = 1 + slowDownFactor;
315     }
316 
317     private void computeDeviationCumulativeBytes() {
318         // compute the maximum cumulativeXxxxBytes among still connected Channels
319         long maxWrittenBytes = 0;
320         long maxReadBytes = 0;
321         long minWrittenBytes = Long.MAX_VALUE;
322         long minReadBytes = Long.MAX_VALUE;
323         for (PerChannel perChannel : channelQueues.values()) {
324             long value = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
325             if (maxWrittenBytes < value) {
326                 maxWrittenBytes = value;
327             }
328             if (minWrittenBytes > value) {
329                 minWrittenBytes = value;
330             }
331             value = perChannel.channelTrafficCounter.cumulativeReadBytes();
332             if (maxReadBytes < value) {
333                 maxReadBytes = value;
334             }
335             if (minReadBytes > value) {
336                 minReadBytes = value;
337             }
338         }
339         boolean multiple = channelQueues.size() > 1;
340         readDeviationActive = multiple && minReadBytes < maxReadBytes / 2;
341         writeDeviationActive = multiple && minWrittenBytes < maxWrittenBytes / 2;
342         cumulativeWrittenBytes.set(maxWrittenBytes);
343         cumulativeReadBytes.set(maxReadBytes);
344     }
345 
346     @Override
347     protected void doAccounting(TrafficCounter counter) {
348         computeDeviationCumulativeBytes();
349         super.doAccounting(counter);
350     }
351 
352     private long computeBalancedWait(float maxLocal, float maxGlobal, long wait) {
353         if (maxGlobal == 0) {
354             // no change
355             return wait;
356         }
357         float ratio = maxLocal / maxGlobal;
358         // if in the boundaries, same value
359         if (ratio > maxDeviation) {
360             if (ratio < 1 - maxDeviation) {
361                 return wait;
362             } else {
363                 ratio = slowDownFactor;
364                 if (wait < MINIMAL_WAIT) {
365                     wait = MINIMAL_WAIT;
366                 }
367             }
368         } else {
369             ratio = accelerationFactor;
370         }
371         return (long) (wait * ratio);
372     }
373 
374     /**
375      * @return the maxGlobalWriteSize
376      */
377     public long getMaxGlobalWriteSize() {
378         return maxGlobalWriteSize;
379     }
380 
381     /**
382      * Note the change will be taken as best effort, meaning
383      * that all already scheduled traffics will not be
384      * changed, but only applied to new traffics.<br>
385      * So the expected usage of this method is to be used not too often,
386      * accordingly to the traffic shaping configuration.
387      *
388      * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
389      *            globally for all channels before write suspended is set.
390      */
391     public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
392         this.maxGlobalWriteSize = checkPositive(maxGlobalWriteSize, "maxGlobalWriteSize");
393     }
394 
395     /**
396      * @return the global size of the buffers for all queues.
397      */
398     public long queuesSize() {
399         return queuesSize.get();
400     }
401 
402     /**
403      * @param newWriteLimit Channel write limit
404      * @param newReadLimit Channel read limit
405      */
406     public void configureChannel(long newWriteLimit, long newReadLimit) {
407         writeChannelLimit = newWriteLimit;
408         readChannelLimit = newReadLimit;
409         long now = TrafficCounter.milliSecondFromNano();
410         for (PerChannel perChannel : channelQueues.values()) {
411             perChannel.channelTrafficCounter.resetAccounting(now);
412         }
413     }
414 
415     /**
416      * @return Channel write limit
417      */
418     public long getWriteChannelLimit() {
419         return writeChannelLimit;
420     }
421 
422     /**
423      * @param writeLimit Channel write limit
424      */
425     public void setWriteChannelLimit(long writeLimit) {
426         writeChannelLimit = writeLimit;
427         long now = TrafficCounter.milliSecondFromNano();
428         for (PerChannel perChannel : channelQueues.values()) {
429             perChannel.channelTrafficCounter.resetAccounting(now);
430         }
431     }
432 
433     /**
434      * @return Channel read limit
435      */
436     public long getReadChannelLimit() {
437         return readChannelLimit;
438     }
439 
440     /**
441      * @param readLimit Channel read limit
442      */
443     public void setReadChannelLimit(long readLimit) {
444         readChannelLimit = readLimit;
445         long now = TrafficCounter.milliSecondFromNano();
446         for (PerChannel perChannel : channelQueues.values()) {
447             perChannel.channelTrafficCounter.resetAccounting(now);
448         }
449     }
450 
451     /**
452      * Release all internal resources of this instance.
453      */
454     public final void release() {
455         trafficCounter.stop();
456     }
457 
458     private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
459         // ensure creation is limited to one thread per channel
460         Channel channel = ctx.channel();
461         Integer key = channel.hashCode();
462         PerChannel perChannel = channelQueues.get(key);
463         if (perChannel == null) {
464             perChannel = new PerChannel();
465             perChannel.messagesQueue = new ArrayDeque<>();
466             // Don't start it since managed through the Global one
467             perChannel.channelTrafficCounter = new TrafficCounter(this, null, "ChannelTC" +
468                     ctx.channel().hashCode(), checkInterval);
469             perChannel.queueSize = 0L;
470             perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
471             perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
472             channelQueues.put(key, perChannel);
473         }
474         return perChannel;
475     }
476 
477     @Override
478     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
479         getOrSetPerChannel(ctx);
480         trafficCounter.resetCumulativeTime();
481         super.handlerAdded(ctx);
482     }
483 
484     @Override
485     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
486         trafficCounter.resetCumulativeTime();
487         Channel channel = ctx.channel();
488         Integer key = channel.hashCode();
489         PerChannel perChannel = channelQueues.remove(key);
490         if (perChannel != null) {
491             // write operations need synchronization
492             synchronized (perChannel) {
493                 if (channel.isActive()) {
494                     for (ToSend toSend : perChannel.messagesQueue) {
495                         long size = calculateSize(toSend.toSend);
496                         trafficCounter.bytesRealWriteFlowControl(size);
497                         perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
498                         perChannel.queueSize -= size;
499                         queuesSize.addAndGet(-size);
500                         ctx.write(toSend.toSend).cascadeTo(toSend.promise);
501                     }
502                 } else {
503                     queuesSize.addAndGet(-perChannel.queueSize);
504                     for (ToSend toSend : perChannel.messagesQueue) {
505                         if (Resource.isAccessible(toSend.toSend, false)) {
506                             Resource.dispose(toSend.toSend);
507                         }
508                     }
509                 }
510                 perChannel.messagesQueue.clear();
511             }
512         }
513         releaseWriteSuspended(ctx);
514         releaseReadSuspended(ctx);
515         super.handlerRemoved(ctx);
516     }
517 
518     @Override
519     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
520         long size = calculateSize(msg);
521         long now = TrafficCounter.milliSecondFromNano();
522         if (size > 0) {
523             // compute the number of ms to wait before reopening the channel
524             long waitGlobal = trafficCounter.readTimeToWait(size, getReadLimit(), maxTime, now);
525             Integer key = ctx.channel().hashCode();
526             PerChannel perChannel = channelQueues.get(key);
527             long wait = 0;
528             if (perChannel != null) {
529                 wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
530                 if (readDeviationActive) {
531                     // now try to balance between the channels
532                     long maxLocalRead;
533                     maxLocalRead = perChannel.channelTrafficCounter.cumulativeReadBytes();
534                     long maxGlobalRead = cumulativeReadBytes.get();
535                     if (maxLocalRead <= 0) {
536                         maxLocalRead = 0;
537                     }
538                     if (maxGlobalRead < maxLocalRead) {
539                         maxGlobalRead = maxLocalRead;
540                     }
541                     wait = computeBalancedWait(maxLocalRead, maxGlobalRead, wait);
542                 }
543             }
544             if (wait < waitGlobal) {
545                 wait = waitGlobal;
546             }
547             wait = checkWaitReadTime(ctx, wait, now);
548             if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
549                 // time in order to try to limit the traffic
550                 // Only AutoRead AND HandlerActive True means Context Active
551                 Channel channel = ctx.channel();
552                 if (logger.isDebugEnabled()) {
553                     logger.debug("Read Suspend: " + wait + ':' + channel.getOption(ChannelOption.AUTO_READ) + ':'
554                             + isHandlerActive(ctx));
555                 }
556                 if (channel.getOption(ChannelOption.AUTO_READ) && isHandlerActive(ctx)) {
557                     channel.setOption(ChannelOption.AUTO_READ, false);
558                     channel.attr(READ_SUSPENDED).set(true);
559                     // Create a Runnable to reactive the read if needed. If one was create before it will just be
560                     // reused to limit object creation
561                     Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
562                     Runnable reopenTask = attr.get();
563                     if (reopenTask == null) {
564                         reopenTask = new ReopenReadTimerTask(ctx);
565                         attr.set(reopenTask);
566                     }
567                     ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
568                     if (logger.isDebugEnabled()) {
569                         logger.debug("Suspend final status => " + channel.getOption(ChannelOption.AUTO_READ) + ':'
570                                 + isHandlerActive(ctx) + " will reopened at: " + wait);
571                     }
572                 }
573             }
574         }
575         informReadOperation(ctx, now);
576         ctx.fireChannelRead(msg);
577     }
578 
579     @Override
580     protected long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
581         Integer key = ctx.channel().hashCode();
582         PerChannel perChannel = channelQueues.get(key);
583         if (perChannel != null) {
584             if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
585                 wait = maxTime;
586             }
587         }
588         return wait;
589     }
590 
591     @Override
592     protected void informReadOperation(final ChannelHandlerContext ctx, final long now) {
593         Integer key = ctx.channel().hashCode();
594         PerChannel perChannel = channelQueues.get(key);
595         if (perChannel != null) {
596             perChannel.lastReadTimestamp = now;
597         }
598     }
599 
600     private static final class ToSend {
601         final long relativeTimeAction;
602         final Object toSend;
603         final Promise<Void> promise;
604         final long size;
605 
606         private ToSend(final long delay, final Object toSend, final long size, final Promise<Void> promise) {
607             relativeTimeAction = delay;
608             this.toSend = toSend;
609             this.size = size;
610             this.promise = promise;
611         }
612     }
613 
614     protected long maximumCumulativeWrittenBytes() {
615         return cumulativeWrittenBytes.get();
616     }
617 
618     protected long maximumCumulativeReadBytes() {
619         return cumulativeReadBytes.get();
620     }
621 
622     /**
623      * To allow for instance doAccounting to use the TrafficCounter per channel.
624      * @return the list of TrafficCounters that exists at the time of the call.
625      */
626     public Collection<TrafficCounter> channelTrafficCounters() {
627         return new AbstractCollection<>() {
628             @Override
629             public Iterator<TrafficCounter> iterator() {
630                 return new Iterator<>() {
631                     final Iterator<PerChannel> iter = channelQueues.values().iterator();
632 
633                     @Override
634                     public boolean hasNext() {
635                         return iter.hasNext();
636                     }
637 
638                     @Override
639                     public TrafficCounter next() {
640                         return iter.next().channelTrafficCounter;
641                     }
642 
643                     @Override
644                     public void remove() {
645                         throw new UnsupportedOperationException();
646                     }
647                 };
648             }
649 
650             @Override
651             public int size() {
652                 return channelQueues.size();
653             }
654         };
655     }
656 
657     @Override
658     public Future<Void> write(final ChannelHandlerContext ctx, final Object msg) {
659         long size = calculateSize(msg);
660         long now = TrafficCounter.milliSecondFromNano();
661         if (size > 0) {
662             // compute the number of ms to wait before continue with the channel
663             long waitGlobal = trafficCounter.writeTimeToWait(size, getWriteLimit(), maxTime, now);
664             Integer key = ctx.channel().hashCode();
665             PerChannel perChannel = channelQueues.get(key);
666             long wait = 0;
667             if (perChannel != null) {
668                 wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now);
669                 if (writeDeviationActive) {
670                     // now try to balance between the channels
671                     long maxLocalWrite;
672                     maxLocalWrite = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
673                     long maxGlobalWrite = cumulativeWrittenBytes.get();
674                     if (maxLocalWrite <= 0) {
675                         maxLocalWrite = 0;
676                     }
677                     if (maxGlobalWrite < maxLocalWrite) {
678                         maxGlobalWrite = maxLocalWrite;
679                     }
680                     wait = computeBalancedWait(maxLocalWrite, maxGlobalWrite, wait);
681                 }
682             }
683             if (wait < waitGlobal) {
684                 wait = waitGlobal;
685             }
686             if (wait >= MINIMAL_WAIT) {
687                 if (logger.isDebugEnabled()) {
688                     logger.debug("Write suspend: " + wait + ':' + ctx.channel().getOption(ChannelOption.AUTO_READ) + ':'
689                             + isHandlerActive(ctx));
690                 }
691                 Promise<Void> promise = ctx.newPromise();
692                 submitWrite(ctx, msg, size, wait, now, promise);
693                 return promise.asFuture();
694             }
695         }
696         Promise<Void> promise = ctx.newPromise();
697         // to maintain order of write
698         submitWrite(ctx, msg, size, 0, now, promise);
699         return promise.asFuture();
700     }
701 
702     @Override
703     protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
704             final long size, final long writedelay, final long now,
705             final Promise<Void> promise) {
706         Channel channel = ctx.channel();
707         Integer key = channel.hashCode();
708         PerChannel perChannel = channelQueues.get(key);
709         if (perChannel == null) {
710             // in case write occurs before handlerAdded is raised for this handler
711             // imply a synchronized only if needed
712             perChannel = getOrSetPerChannel(ctx);
713         }
714         final ToSend newToSend;
715         long delay = writedelay;
716         boolean globalSizeExceeded = false;
717         // write operations need synchronization
718         synchronized (perChannel) {
719             if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
720                 trafficCounter.bytesRealWriteFlowControl(size);
721                 perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
722                 ctx.write(msg).cascadeTo(promise);
723                 perChannel.lastWriteTimestamp = now;
724                 return;
725             }
726             if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
727                 delay = maxTime;
728             }
729             newToSend = new ToSend(delay + now, msg, size, promise);
730             perChannel.messagesQueue.addLast(newToSend);
731             perChannel.queueSize += size;
732             queuesSize.addAndGet(size);
733             checkWriteSuspend(ctx, delay, perChannel.queueSize);
734             if (queuesSize.get() > maxGlobalWriteSize) {
735                 globalSizeExceeded = true;
736             }
737         }
738         if (globalSizeExceeded) {
739             setUserDefinedWritability(ctx, false);
740         }
741         final long futureNow = newToSend.relativeTimeAction;
742         final PerChannel forSchedule = perChannel;
743         ctx.executor().schedule(() -> sendAllValid(ctx, forSchedule, futureNow), delay, TimeUnit.MILLISECONDS);
744     }
745 
746     private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
747         // write operations need synchronization
748         synchronized (perChannel) {
749             ToSend newToSend = perChannel.messagesQueue.pollFirst();
750             for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
751                 if (newToSend.relativeTimeAction <= now) {
752                     long size = newToSend.size;
753                     trafficCounter.bytesRealWriteFlowControl(size);
754                     perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
755                     perChannel.queueSize -= size;
756                     queuesSize.addAndGet(-size);
757                     ctx.write(newToSend.toSend).cascadeTo(newToSend.promise);
758                     perChannel.lastWriteTimestamp = now;
759                 } else {
760                     perChannel.messagesQueue.addFirst(newToSend);
761                     break;
762                 }
763             }
764             if (perChannel.messagesQueue.isEmpty()) {
765                 releaseWriteSuspended(ctx);
766             }
767         }
768         ctx.flush();
769     }
770 
771     @Override
772     public String toString() {
773         return new StringBuilder(340).append(super.toString())
774             .append(" Write Channel Limit: ").append(writeChannelLimit)
775             .append(" Read Channel Limit: ").append(readChannelLimit).toString();
776     }
777 }