View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.traffic;
17  
18  import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE;
19  import static io.netty.util.internal.ObjectUtil.checkPositive;
20  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
21  
22  import io.netty.buffer.ByteBuf;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelHandler.Sharable;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelHandlerContext;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.util.Attribute;
29  import io.netty.util.concurrent.EventExecutor;
30  import io.netty.util.internal.PlatformDependent;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.util.AbstractCollection;
35  import java.util.ArrayDeque;
36  import java.util.Collection;
37  import java.util.Iterator;
38  import java.util.concurrent.ConcurrentMap;
39  import java.util.concurrent.ScheduledExecutorService;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicLong;
42  
43  /**
44   * This implementation of the {@link AbstractTrafficShapingHandler} is for global
45   * and per channel traffic shaping, that is to say a global limitation of the bandwidth, whatever
46   * the number of opened channels and a per channel limitation of the bandwidth.<br><br>
47   * This version shall not be in the same pipeline than other TrafficShapingHandler.<br><br>
48   *
49   * The general use should be as follow:<br>
50   * <ul>
51   * <li>Create your unique GlobalChannelTrafficShapingHandler like:<br><br>
52   * <tt>GlobalChannelTrafficShapingHandler myHandler = new GlobalChannelTrafficShapingHandler(executor);</tt><br><br>
53   * The executor could be the underlying IO worker pool<br>
54   * <tt>pipeline.addLast(myHandler);</tt><br><br>
55   *
56   * <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
57   * and shared among all channels as the counter must be shared among all channels.</b><br><br>
58   *
59   * Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
60   * or the check interval (in millisecond) that represents the delay between two computations of the
61   * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br>
62   * Note that as this is a fusion of both Global and Channel Traffic Shaping, limits are in 2 sets,
63   * respectively Global and Channel.<br><br>
64   *
65   * A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
66   * it is recommended to set a positive value, even if it is high since the precision of the
67   * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
68   * the less precise the traffic shaping will be. It is suggested as higher value something close
69   * to 5 or 10 minutes.<br><br>
70   *
71   * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br>
72   * </li>
73   * <li>In your handler, you should consider to use the {@code channel.isWritable()} and
74   * {@code channelWritabilityChanged(ctx)} to handle writability, or through
75   * {@code future.addListener(new GenericFutureListener())} on the future returned by
76   * {@code ctx.write()}.</li>
77   * <li>You shall also consider to have object size in read or write operations relatively adapted to
78   * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
79   * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.<br><br></li>
80   * <li>Some configuration methods will be taken as best effort, meaning
81   * that all already scheduled traffics will not be
82   * changed, but only applied to new traffics.<br>
83   * So the expected usage of those methods are to be used not too often,
84   * accordingly to the traffic shaping configuration.</li>
85   * </ul><br>
86   *
87   * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources.
88   * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own.
89   */
90  @Sharable
91  public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
92      private static final InternalLogger logger =
93              InternalLoggerFactory.getInstance(GlobalChannelTrafficShapingHandler.class);
94      /**
95       * All queues per channel
96       */
97      final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
98  
99      /**
100      * Global queues size
101      */
102     private final AtomicLong queuesSize = new AtomicLong();
103 
104     /**
105      * Maximum cumulative writing bytes for one channel among all (as long as channels stay the same)
106      */
107     private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
108 
109     /**
110      * Maximum cumulative read bytes for one channel among all (as long as channels stay the same)
111      */
112     private final AtomicLong cumulativeReadBytes = new AtomicLong();
113 
114     /**
115      * Max size in the list before proposing to stop writing new objects from next handlers
116      * for all channel (global)
117      */
118     volatile long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
119 
120     /**
121      * Limit in B/s to apply to write
122      */
123     private volatile long writeChannelLimit;
124 
125     /**
126      * Limit in B/s to apply to read
127      */
128     private volatile long readChannelLimit;
129 
130     private static final float DEFAULT_DEVIATION = 0.1F;
131     private static final float MAX_DEVIATION = 0.4F;
132     private static final float DEFAULT_SLOWDOWN = 0.4F;
133     private static final float DEFAULT_ACCELERATION = -0.1F;
134     private volatile float maxDeviation;
135     private volatile float accelerationFactor;
136     private volatile float slowDownFactor;
137     private volatile boolean readDeviationActive;
138     private volatile boolean writeDeviationActive;
139 
140     static final class PerChannel {
141         ArrayDeque<ToSend> messagesQueue;
142         TrafficCounter channelTrafficCounter;
143         long queueSize;
144         long lastWriteTimestamp;
145         long lastReadTimestamp;
146     }
147 
148     /**
149      * Create the global TrafficCounter
150      */
151     void createGlobalTrafficCounter(ScheduledExecutorService executor) {
152         // Default
153         setMaxDeviation(DEFAULT_DEVIATION, DEFAULT_SLOWDOWN, DEFAULT_ACCELERATION);
154         checkNotNullWithIAE(executor, "executor");
155         TrafficCounter tc = new GlobalChannelTrafficCounter(this, executor, "GlobalChannelTC", checkInterval);
156         setTrafficCounter(tc);
157         tc.start();
158     }
159 
160     @Override
161     protected int userDefinedWritabilityIndex() {
162         return AbstractTrafficShapingHandler.GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
163     }
164 
165     /**
166      * Create a new instance.
167      *
168      * @param executor
169      *            the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
170      * @param writeGlobalLimit
171      *            0 or a limit in bytes/s
172      * @param readGlobalLimit
173      *            0 or a limit in bytes/s
174      * @param writeChannelLimit
175      *            0 or a limit in bytes/s
176      * @param readChannelLimit
177      *            0 or a limit in bytes/s
178      * @param checkInterval
179      *            The delay between two computations of performances for
180      *            channels or 0 if no stats are to be computed.
181      * @param maxTime
182      *            The maximum delay to wait in case of traffic excess.
183      */
184     public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
185             long writeGlobalLimit, long readGlobalLimit,
186             long writeChannelLimit, long readChannelLimit,
187             long checkInterval, long maxTime) {
188         super(writeGlobalLimit, readGlobalLimit, checkInterval, maxTime);
189         createGlobalTrafficCounter(executor);
190         this.writeChannelLimit = writeChannelLimit;
191         this.readChannelLimit = readChannelLimit;
192     }
193 
194     /**
195      * Create a new instance.
196      *
197      * @param executor
198      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
199      * @param writeGlobalLimit
200      *            0 or a limit in bytes/s
201      * @param readGlobalLimit
202      *            0 or a limit in bytes/s
203      * @param writeChannelLimit
204      *            0 or a limit in bytes/s
205      * @param readChannelLimit
206      *            0 or a limit in bytes/s
207      * @param checkInterval
208      *          The delay between two computations of performances for
209      *            channels or 0 if no stats are to be computed.
210      */
211     public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
212             long writeGlobalLimit, long readGlobalLimit,
213             long writeChannelLimit, long readChannelLimit,
214             long checkInterval) {
215         super(writeGlobalLimit, readGlobalLimit, checkInterval);
216         this.writeChannelLimit = writeChannelLimit;
217         this.readChannelLimit = readChannelLimit;
218         createGlobalTrafficCounter(executor);
219     }
220 
221     /**
222      * Create a new instance.
223      *
224      * @param executor
225      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
226      * @param writeGlobalLimit
227      *            0 or a limit in bytes/s
228      * @param readGlobalLimit
229      *            0 or a limit in bytes/s
230      * @param writeChannelLimit
231      *            0 or a limit in bytes/s
232      * @param readChannelLimit
233      *            0 or a limit in bytes/s
234      */
235     public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
236             long writeGlobalLimit, long readGlobalLimit,
237             long writeChannelLimit, long readChannelLimit) {
238         super(writeGlobalLimit, readGlobalLimit);
239         this.writeChannelLimit = writeChannelLimit;
240         this.readChannelLimit = readChannelLimit;
241         createGlobalTrafficCounter(executor);
242     }
243 
244     /**
245      * Create a new instance.
246      *
247      * @param executor
248      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
249      * @param checkInterval
250      *          The delay between two computations of performances for
251      *            channels or 0 if no stats are to be computed.
252      */
253     public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) {
254         super(checkInterval);
255         createGlobalTrafficCounter(executor);
256     }
257 
258     /**
259      * Create a new instance.
260      *
261      * @param executor
262      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
263      */
264     public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor) {
265         createGlobalTrafficCounter(executor);
266     }
267 
268     /**
269      * @return the current max deviation
270      */
271     public float maxDeviation() {
272         return maxDeviation;
273     }
274 
275     /**
276      * @return the current acceleration factor
277      */
278     public float accelerationFactor() {
279         return accelerationFactor;
280     }
281 
282     /**
283      * @return the current slow down factor
284      */
285     public float slowDownFactor() {
286         return slowDownFactor;
287     }
288 
289     /**
290      * @param maxDeviation
291      *            the maximum deviation to allow during computation of average, default deviation
292      *            being 0.1, so +/-10% of the desired bandwidth. Maximum being 0.4.
293      * @param slowDownFactor
294      *            the factor set as +x% to the too fast client (minimal value being 0, meaning no
295      *            slow down factor), default being 40% (0.4).
296      * @param accelerationFactor
297      *            the factor set as -x% to the too slow client (maximal value being 0, meaning no
298      *            acceleration factor), default being -10% (-0.1).
299      */
300     public void setMaxDeviation(float maxDeviation, float slowDownFactor, float accelerationFactor) {
301         if (maxDeviation > MAX_DEVIATION) {
302             throw new IllegalArgumentException("maxDeviation must be <= " + MAX_DEVIATION);
303         }
304         checkPositiveOrZero(slowDownFactor, "slowDownFactor");
305         if (accelerationFactor > 0) {
306             throw new IllegalArgumentException("accelerationFactor must be <= 0");
307         }
308         this.maxDeviation = maxDeviation;
309         this.accelerationFactor = 1 + accelerationFactor;
310         this.slowDownFactor = 1 + slowDownFactor;
311     }
312 
313     private void computeDeviationCumulativeBytes() {
314         // compute the maximum cumulativeXxxxBytes among still connected Channels
315         long maxWrittenBytes = 0;
316         long maxReadBytes = 0;
317         long minWrittenBytes = Long.MAX_VALUE;
318         long minReadBytes = Long.MAX_VALUE;
319         for (PerChannel perChannel : channelQueues.values()) {
320             long value = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
321             if (maxWrittenBytes < value) {
322                 maxWrittenBytes = value;
323             }
324             if (minWrittenBytes > value) {
325                 minWrittenBytes = value;
326             }
327             value = perChannel.channelTrafficCounter.cumulativeReadBytes();
328             if (maxReadBytes < value) {
329                 maxReadBytes = value;
330             }
331             if (minReadBytes > value) {
332                 minReadBytes = value;
333             }
334         }
335         boolean multiple = channelQueues.size() > 1;
336         readDeviationActive = multiple && minReadBytes < maxReadBytes / 2;
337         writeDeviationActive = multiple && minWrittenBytes < maxWrittenBytes / 2;
338         cumulativeWrittenBytes.set(maxWrittenBytes);
339         cumulativeReadBytes.set(maxReadBytes);
340     }
341 
342     @Override
343     protected void doAccounting(TrafficCounter counter) {
344         computeDeviationCumulativeBytes();
345         super.doAccounting(counter);
346     }
347 
348     private long computeBalancedWait(float maxLocal, float maxGlobal, long wait) {
349         if (maxGlobal == 0) {
350             // no change
351             return wait;
352         }
353         float ratio = maxLocal / maxGlobal;
354         // if in the boundaries, same value
355         if (ratio > maxDeviation) {
356             if (ratio < 1 - maxDeviation) {
357                 return wait;
358             } else {
359                 ratio = slowDownFactor;
360                 if (wait < MINIMAL_WAIT) {
361                     wait = MINIMAL_WAIT;
362                 }
363             }
364         } else {
365             ratio = accelerationFactor;
366         }
367         return (long) (wait * ratio);
368     }
369 
370     /**
371      * @return the maxGlobalWriteSize
372      */
373     public long getMaxGlobalWriteSize() {
374         return maxGlobalWriteSize;
375     }
376 
377     /**
378      * Note the change will be taken as best effort, meaning
379      * that all already scheduled traffics will not be
380      * changed, but only applied to new traffics.<br>
381      * So the expected usage of this method is to be used not too often,
382      * accordingly to the traffic shaping configuration.
383      *
384      * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
385      *            globally for all channels before write suspended is set.
386      */
387     public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
388         this.maxGlobalWriteSize = checkPositive(maxGlobalWriteSize, "maxGlobalWriteSize");
389     }
390 
391     /**
392      * @return the global size of the buffers for all queues.
393      */
394     public long queuesSize() {
395         return queuesSize.get();
396     }
397 
398     /**
399      * @param newWriteLimit Channel write limit
400      * @param newReadLimit Channel read limit
401      */
402     public void configureChannel(long newWriteLimit, long newReadLimit) {
403         writeChannelLimit = newWriteLimit;
404         readChannelLimit = newReadLimit;
405         long now = TrafficCounter.milliSecondFromNano();
406         for (PerChannel perChannel : channelQueues.values()) {
407             perChannel.channelTrafficCounter.resetAccounting(now);
408         }
409     }
410 
411     /**
412      * @return Channel write limit
413      */
414     public long getWriteChannelLimit() {
415         return writeChannelLimit;
416     }
417 
418     /**
419      * @param writeLimit Channel write limit
420      */
421     public void setWriteChannelLimit(long writeLimit) {
422         writeChannelLimit = writeLimit;
423         long now = TrafficCounter.milliSecondFromNano();
424         for (PerChannel perChannel : channelQueues.values()) {
425             perChannel.channelTrafficCounter.resetAccounting(now);
426         }
427     }
428 
429     /**
430      * @return Channel read limit
431      */
432     public long getReadChannelLimit() {
433         return readChannelLimit;
434     }
435 
436     /**
437      * @param readLimit Channel read limit
438      */
439     public void setReadChannelLimit(long readLimit) {
440         readChannelLimit = readLimit;
441         long now = TrafficCounter.milliSecondFromNano();
442         for (PerChannel perChannel : channelQueues.values()) {
443             perChannel.channelTrafficCounter.resetAccounting(now);
444         }
445     }
446 
447     /**
448      * Release all internal resources of this instance.
449      */
450     public final void release() {
451         trafficCounter.stop();
452     }
453 
454     private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
455         // ensure creation is limited to one thread per channel
456         Channel channel = ctx.channel();
457         Integer key = channel.hashCode();
458         PerChannel perChannel = channelQueues.get(key);
459         if (perChannel == null) {
460             perChannel = new PerChannel();
461             perChannel.messagesQueue = new ArrayDeque<ToSend>();
462             // Don't start it since managed through the Global one
463             perChannel.channelTrafficCounter = new TrafficCounter(this, null, "ChannelTC" +
464                     ctx.channel().hashCode(), checkInterval);
465             perChannel.queueSize = 0L;
466             perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
467             perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
468             channelQueues.put(key, perChannel);
469         }
470         return perChannel;
471     }
472 
473     @Override
474     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
475         getOrSetPerChannel(ctx);
476         trafficCounter.resetCumulativeTime();
477         super.handlerAdded(ctx);
478     }
479 
480     @Override
481     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
482         trafficCounter.resetCumulativeTime();
483         Channel channel = ctx.channel();
484         Integer key = channel.hashCode();
485         PerChannel perChannel = channelQueues.remove(key);
486         if (perChannel != null) {
487             // write operations need synchronization
488             synchronized (perChannel) {
489                 if (channel.isActive()) {
490                     for (ToSend toSend : perChannel.messagesQueue) {
491                         long size = calculateSize(toSend.toSend);
492                         trafficCounter.bytesRealWriteFlowControl(size);
493                         perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
494                         perChannel.queueSize -= size;
495                         queuesSize.addAndGet(-size);
496                         ctx.write(toSend.toSend, toSend.promise);
497                     }
498                 } else {
499                     queuesSize.addAndGet(-perChannel.queueSize);
500                     for (ToSend toSend : perChannel.messagesQueue) {
501                         if (toSend.toSend instanceof ByteBuf) {
502                             ((ByteBuf) toSend.toSend).release();
503                         }
504                     }
505                 }
506                 perChannel.messagesQueue.clear();
507             }
508         }
509         releaseWriteSuspended(ctx);
510         releaseReadSuspended(ctx);
511         super.handlerRemoved(ctx);
512     }
513 
514     @Override
515     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
516         long size = calculateSize(msg);
517         long now = TrafficCounter.milliSecondFromNano();
518         if (size > 0) {
519             // compute the number of ms to wait before reopening the channel
520             long waitGlobal = trafficCounter.readTimeToWait(size, getReadLimit(), maxTime, now);
521             Integer key = ctx.channel().hashCode();
522             PerChannel perChannel = channelQueues.get(key);
523             long wait = 0;
524             if (perChannel != null) {
525                 wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
526                 if (readDeviationActive) {
527                     // now try to balance between the channels
528                     long maxLocalRead;
529                     maxLocalRead = perChannel.channelTrafficCounter.cumulativeReadBytes();
530                     long maxGlobalRead = cumulativeReadBytes.get();
531                     if (maxLocalRead <= 0) {
532                         maxLocalRead = 0;
533                     }
534                     if (maxGlobalRead < maxLocalRead) {
535                         maxGlobalRead = maxLocalRead;
536                     }
537                     wait = computeBalancedWait(maxLocalRead, maxGlobalRead, wait);
538                 }
539             }
540             if (wait < waitGlobal) {
541                 wait = waitGlobal;
542             }
543             wait = checkWaitReadTime(ctx, wait, now);
544             if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
545                 // time in order to try to limit the traffic
546                 // Only AutoRead AND HandlerActive True means Context Active
547                 Channel channel = ctx.channel();
548                 ChannelConfig config = channel.config();
549                 if (logger.isDebugEnabled()) {
550                     logger.debug("Read Suspend: " + wait + ':' + config.isAutoRead() + ':'
551                             + isHandlerActive(ctx));
552                 }
553                 if (config.isAutoRead() && isHandlerActive(ctx)) {
554                     config.setAutoRead(false);
555                     channel.attr(READ_SUSPENDED).set(true);
556                     // Create a Runnable to reactive the read if needed. If one was create before it will just be
557                     // reused to limit object creation
558                     Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
559                     Runnable reopenTask = attr.get();
560                     if (reopenTask == null) {
561                         reopenTask = new ReopenReadTimerTask(ctx);
562                         attr.set(reopenTask);
563                     }
564                     ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
565                     if (logger.isDebugEnabled()) {
566                         logger.debug("Suspend final status => " + config.isAutoRead() + ':'
567                                 + isHandlerActive(ctx) + " will reopened at: " + wait);
568                     }
569                 }
570             }
571         }
572         informReadOperation(ctx, now);
573         ctx.fireChannelRead(msg);
574     }
575 
576     @Override
577     protected long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
578         Integer key = ctx.channel().hashCode();
579         PerChannel perChannel = channelQueues.get(key);
580         if (perChannel != null) {
581             if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
582                 wait = maxTime;
583             }
584         }
585         return wait;
586     }
587 
588     @Override
589     protected void informReadOperation(final ChannelHandlerContext ctx, final long now) {
590         Integer key = ctx.channel().hashCode();
591         PerChannel perChannel = channelQueues.get(key);
592         if (perChannel != null) {
593             perChannel.lastReadTimestamp = now;
594         }
595     }
596 
597     private static final class ToSend {
598         final long relativeTimeAction;
599         final Object toSend;
600         final ChannelPromise promise;
601         final long size;
602 
603         private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
604             relativeTimeAction = delay;
605             this.toSend = toSend;
606             this.size = size;
607             this.promise = promise;
608         }
609     }
610 
611     protected long maximumCumulativeWrittenBytes() {
612         return cumulativeWrittenBytes.get();
613     }
614 
615     protected long maximumCumulativeReadBytes() {
616         return cumulativeReadBytes.get();
617     }
618 
619     /**
620      * To allow for instance doAccounting to use the TrafficCounter per channel.
621      * @return the list of TrafficCounters that exists at the time of the call.
622      */
623     public Collection<TrafficCounter> channelTrafficCounters() {
624         return new AbstractCollection<TrafficCounter>() {
625             @Override
626             public Iterator<TrafficCounter> iterator() {
627                 return new Iterator<TrafficCounter>() {
628                     final Iterator<PerChannel> iter = channelQueues.values().iterator();
629                     @Override
630                     public boolean hasNext() {
631                         return iter.hasNext();
632                     }
633                     @Override
634                     public TrafficCounter next() {
635                         return iter.next().channelTrafficCounter;
636                     }
637                     @Override
638                     public void remove() {
639                         throw new UnsupportedOperationException();
640                     }
641                 };
642             }
643             @Override
644             public int size() {
645                 return channelQueues.size();
646             }
647         };
648     }
649 
650     @Override
651     public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
652             throws Exception {
653         long size = calculateSize(msg);
654         long now = TrafficCounter.milliSecondFromNano();
655         if (size > 0) {
656             // compute the number of ms to wait before continue with the channel
657             long waitGlobal = trafficCounter.writeTimeToWait(size, getWriteLimit(), maxTime, now);
658             Integer key = ctx.channel().hashCode();
659             PerChannel perChannel = channelQueues.get(key);
660             long wait = 0;
661             if (perChannel != null) {
662                 wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now);
663                 if (writeDeviationActive) {
664                     // now try to balance between the channels
665                     long maxLocalWrite;
666                     maxLocalWrite = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
667                     long maxGlobalWrite = cumulativeWrittenBytes.get();
668                     if (maxLocalWrite <= 0) {
669                         maxLocalWrite = 0;
670                     }
671                     if (maxGlobalWrite < maxLocalWrite) {
672                         maxGlobalWrite = maxLocalWrite;
673                     }
674                     wait = computeBalancedWait(maxLocalWrite, maxGlobalWrite, wait);
675                 }
676             }
677             if (wait < waitGlobal) {
678                 wait = waitGlobal;
679             }
680             if (wait >= MINIMAL_WAIT) {
681                 if (logger.isDebugEnabled()) {
682                     logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
683                             + isHandlerActive(ctx));
684                 }
685                 submitWrite(ctx, msg, size, wait, now, promise);
686                 return;
687             }
688         }
689         // to maintain order of write
690         submitWrite(ctx, msg, size, 0, now, promise);
691     }
692 
693     @Override
694     protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
695             final long size, final long writedelay, final long now,
696             final ChannelPromise promise) {
697         Channel channel = ctx.channel();
698         Integer key = channel.hashCode();
699         PerChannel perChannel = channelQueues.get(key);
700         if (perChannel == null) {
701             // in case write occurs before handlerAdded is raised for this handler
702             // imply a synchronized only if needed
703             perChannel = getOrSetPerChannel(ctx);
704         }
705         final ToSend newToSend;
706         long delay = writedelay;
707         boolean globalSizeExceeded = false;
708         // write operations need synchronization
709         synchronized (perChannel) {
710             if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
711                 trafficCounter.bytesRealWriteFlowControl(size);
712                 perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
713                 ctx.write(msg, promise);
714                 perChannel.lastWriteTimestamp = now;
715                 return;
716             }
717             if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
718                 delay = maxTime;
719             }
720             newToSend = new ToSend(delay + now, msg, size, promise);
721             perChannel.messagesQueue.addLast(newToSend);
722             perChannel.queueSize += size;
723             queuesSize.addAndGet(size);
724             checkWriteSuspend(ctx, delay, perChannel.queueSize);
725             if (queuesSize.get() > maxGlobalWriteSize) {
726                 globalSizeExceeded = true;
727             }
728         }
729         if (globalSizeExceeded) {
730             setUserDefinedWritability(ctx, false);
731         }
732         final long futureNow = newToSend.relativeTimeAction;
733         final PerChannel forSchedule = perChannel;
734         ctx.executor().schedule(new Runnable() {
735             @Override
736             public void run() {
737                 sendAllValid(ctx, forSchedule, futureNow);
738             }
739         }, delay, TimeUnit.MILLISECONDS);
740     }
741 
742     private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
743         // write operations need synchronization
744         synchronized (perChannel) {
745             ToSend newToSend = perChannel.messagesQueue.pollFirst();
746             for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
747                 if (newToSend.relativeTimeAction <= now) {
748                     long size = newToSend.size;
749                     trafficCounter.bytesRealWriteFlowControl(size);
750                     perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
751                     perChannel.queueSize -= size;
752                     queuesSize.addAndGet(-size);
753                     ctx.write(newToSend.toSend, newToSend.promise);
754                     perChannel.lastWriteTimestamp = now;
755                 } else {
756                     perChannel.messagesQueue.addFirst(newToSend);
757                     break;
758                 }
759             }
760             if (perChannel.messagesQueue.isEmpty()) {
761                 releaseWriteSuspended(ctx);
762             }
763         }
764         ctx.flush();
765     }
766 
767     @Override
768     public String toString() {
769         return new StringBuilder(340).append(super.toString())
770             .append(" Write Channel Limit: ").append(writeChannelLimit)
771             .append(" Read Channel Limit: ").append(readChannelLimit).toString();
772     }
773 }