View Javadoc
1   /*
2    * Copyright 2011 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.traffic;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.channel.ChannelHandlerAdapter;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.util.Attribute;
26  import io.netty.util.AttributeKey;
27  import io.netty.util.internal.logging.InternalLogger;
28  import io.netty.util.internal.logging.InternalLoggerFactory;
29  
30  import java.util.concurrent.TimeUnit;
31  
32  /**
33   * <p>AbstractTrafficShapingHandler allows to limit the global bandwidth
34   * (see {@link GlobalTrafficShapingHandler}) or per session
35   * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
36   * It allows you to implement an almost real time monitoring of the bandwidth using
37   * the monitors from {@link TrafficCounter} that will call back every checkInterval
38   * the method doAccounting of this handler.</p>
39   *
40   * <p>If you want for any particular reasons to stop the monitoring (accounting) or to change
41   * the read/write limit or the check interval, several methods allow that for you:</p>
42   * <ul>
43   * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
44   * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
45   * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
46   * </ul>
47   */
48  public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapter {
49      private static final InternalLogger logger =
50              InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class);
51      /**
52       * Default delay between two checks: 1s
53       */
54      public static final long DEFAULT_CHECK_INTERVAL = 1000;
55  
56     /**
57      * Default max delay in case of traffic shaping
58      * (during which no communication will occur).
59      * Shall be less than TIMEOUT. Here half of "standard" 30s
60      */
61      public static final long DEFAULT_MAX_TIME = 15000;
62  
63      /**
64       * Default max size to not exceed in buffer (write only).
65       */
66      static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
67  
68      /**
69       * Default minimal time to wait
70       */
71      static final long MINIMAL_WAIT = 10;
72  
73      /**
74       * Traffic Counter
75       */
76      protected TrafficCounter trafficCounter;
77  
78      /**
79       * Limit in B/s to apply to write
80       */
81      private volatile long writeLimit;
82  
83      /**
84       * Limit in B/s to apply to read
85       */
86      private volatile long readLimit;
87  
88      /**
89       * Max delay in wait
90       */
91      protected volatile long maxTime = DEFAULT_MAX_TIME; // default 15 s
92  
93      /**
94       * Delay between two performance snapshots
95       */
96      protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
97  
98      static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey
99              .valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED");
100     static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class
101             .getName() + ".REOPEN_TASK");
102 
103     /**
104      * Max time to delay before proposing to stop writing new objects from next handlers
105      */
106     volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL; // default 4 s
107     /**
108      * Max size in the list before proposing to stop writing new objects from next handlers
109      */
110     volatile long maxWriteSize = DEFAULT_MAX_SIZE; // default 4MB
111 
112     /**
113      * Rank in UserDefinedWritability (1 for Channel, 2 for Global TrafficShapingHandler).
114      * Set in final constructor. Must be between 1 and 31
115      */
116     final int userDefinedWritabilityIndex;
117 
118     /**
119      * Default value for Channel UserDefinedWritability index
120      */
121     static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
122 
123     /**
124      * Default value for Global UserDefinedWritability index
125      */
126     static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
127 
128     /**
129      * Default value for GlobalChannel UserDefinedWritability index
130      */
131     static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
132 
133     /**
134      * @param newTrafficCounter
135      *            the TrafficCounter to set
136      */
137     void setTrafficCounter(TrafficCounter newTrafficCounter) {
138         trafficCounter = newTrafficCounter;
139     }
140 
141     /**
142      * @return the index to be used by the TrafficShapingHandler to manage the user defined writability.
143      *              For Channel TSH it is defined as {@value #CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
144      *              for Global TSH it is defined as {@value #GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
145      *              for GlobalChannel TSH it is defined as
146      *              {@value #GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}.
147      */
148     int userDefinedWritabilityIndex() {
149         if (this instanceof GlobalChannelTrafficShapingHandler) {
150             return GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
151         } else if (this instanceof GlobalTrafficShapingHandler) {
152             return GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
153         } else {
154             return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
155         }
156     }
157 
158     /**
159      * @param writeLimit
160      *          0 or a limit in bytes/s
161      * @param readLimit
162      *          0 or a limit in bytes/s
163      * @param checkInterval
164      *            The delay between two computations of performances for
165      *            channels or 0 if no stats are to be computed.
166      * @param maxTime
167      *            The maximum delay to wait in case of traffic excess.
168      *            Must be positive.
169      */
170     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
171         if (maxTime <= 0) {
172             throw new IllegalArgumentException("maxTime must be positive");
173         }
174 
175         userDefinedWritabilityIndex = userDefinedWritabilityIndex();
176         this.writeLimit = writeLimit;
177         this.readLimit = readLimit;
178         this.checkInterval = checkInterval;
179         this.maxTime = maxTime;
180     }
181 
182     /**
183      * Constructor using default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
184      * @param writeLimit
185      *            0 or a limit in bytes/s
186      * @param readLimit
187      *            0 or a limit in bytes/s
188      * @param checkInterval
189      *            The delay between two computations of performances for
190      *            channels or 0 if no stats are to be computed.
191      */
192     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) {
193         this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
194     }
195 
196     /**
197      * Constructor using default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
198      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
199      *
200      * @param writeLimit
201      *          0 or a limit in bytes/s
202      * @param readLimit
203      *          0 or a limit in bytes/s
204      */
205     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) {
206         this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
207     }
208 
209     /**
210      * Constructor using NO LIMIT, default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
211      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
212      */
213     protected AbstractTrafficShapingHandler() {
214         this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
215     }
216 
217     /**
218      * Constructor using NO LIMIT and
219      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
220      *
221      * @param checkInterval
222      *            The delay between two computations of performances for
223      *            channels or 0 if no stats are to be computed.
224      */
225     protected AbstractTrafficShapingHandler(long checkInterval) {
226         this(0, 0, checkInterval, DEFAULT_MAX_TIME);
227     }
228 
229     /**
230      * Change the underlying limitations and check interval.
231      * <p>Note the change will be taken as best effort, meaning
232      * that all already scheduled traffics will not be
233      * changed, but only applied to new traffics.</p>
234      * <p>So the expected usage of this method is to be used not too often,
235      * accordingly to the traffic shaping configuration.</p>
236      *
237      * @param newWriteLimit The new write limit (in bytes)
238      * @param newReadLimit The new read limit (in bytes)
239      * @param newCheckInterval The new check interval (in milliseconds)
240      */
241     public void configure(long newWriteLimit, long newReadLimit,
242             long newCheckInterval) {
243         configure(newWriteLimit, newReadLimit);
244         configure(newCheckInterval);
245     }
246 
247     /**
248      * Change the underlying limitations.
249      * <p>Note the change will be taken as best effort, meaning
250      * that all already scheduled traffics will not be
251      * changed, but only applied to new traffics.</p>
252      * <p>So the expected usage of this method is to be used not too often,
253      * accordingly to the traffic shaping configuration.</p>
254      *
255      * @param newWriteLimit The new write limit (in bytes)
256      * @param newReadLimit The new read limit (in bytes)
257      */
258     public void configure(long newWriteLimit, long newReadLimit) {
259         writeLimit = newWriteLimit;
260         readLimit = newReadLimit;
261         if (trafficCounter != null) {
262             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
263         }
264     }
265 
266     /**
267      * Change the check interval.
268      *
269      * @param newCheckInterval The new check interval (in milliseconds)
270      */
271     public void configure(long newCheckInterval) {
272         checkInterval = newCheckInterval;
273         if (trafficCounter != null) {
274             trafficCounter.configure(checkInterval);
275         }
276     }
277 
278     /**
279      * @return the writeLimit
280      */
281     public long getWriteLimit() {
282         return writeLimit;
283     }
284 
285     /**
286      * <p>Note the change will be taken as best effort, meaning
287      * that all already scheduled traffics will not be
288      * changed, but only applied to new traffics.</p>
289      * <p>So the expected usage of this method is to be used not too often,
290      * accordingly to the traffic shaping configuration.</p>
291      *
292      * @param writeLimit the writeLimit to set
293      */
294     public void setWriteLimit(long writeLimit) {
295         this.writeLimit = writeLimit;
296         if (trafficCounter != null) {
297             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
298         }
299     }
300 
301     /**
302      * @return the readLimit
303      */
304     public long getReadLimit() {
305         return readLimit;
306     }
307 
308     /**
309      * <p>Note the change will be taken as best effort, meaning
310      * that all already scheduled traffics will not be
311      * changed, but only applied to new traffics.</p>
312      * <p>So the expected usage of this method is to be used not too often,
313      * accordingly to the traffic shaping configuration.</p>
314      *
315      * @param readLimit the readLimit to set
316      */
317     public void setReadLimit(long readLimit) {
318         this.readLimit = readLimit;
319         if (trafficCounter != null) {
320             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
321         }
322     }
323 
324     /**
325      * @return the checkInterval
326      */
327     public long getCheckInterval() {
328         return checkInterval;
329     }
330 
331     /**
332      * @param checkInterval the interval in ms between each step check to set, default value beeing 1000 ms.
333      */
334     public void setCheckInterval(long checkInterval) {
335         this.checkInterval = checkInterval;
336         if (trafficCounter != null) {
337             trafficCounter.configure(checkInterval);
338         }
339     }
340 
341     /**
342      * <p>Note the change will be taken as best effort, meaning
343      * that all already scheduled traffics will not be
344      * changed, but only applied to new traffics.</p>
345      * <p>So the expected usage of this method is to be used not too often,
346      * accordingly to the traffic shaping configuration.</p>
347      *
348      * @param maxTime
349      *            Max delay in wait, shall be less than TIME OUT in related protocol.
350      *            Must be positive.
351      */
352     public void setMaxTimeWait(long maxTime) {
353         if (maxTime <= 0) {
354             throw new IllegalArgumentException("maxTime must be positive");
355         }
356         this.maxTime = maxTime;
357     }
358 
359     /**
360      * @return the max delay in wait to prevent TIME OUT
361      */
362     public long getMaxTimeWait() {
363         return maxTime;
364     }
365 
366     /**
367      * @return the maxWriteDelay
368      */
369     public long getMaxWriteDelay() {
370         return maxWriteDelay;
371     }
372 
373     /**
374      * <p>Note the change will be taken as best effort, meaning
375      * that all already scheduled traffics will not be
376      * changed, but only applied to new traffics.</p>
377      * <p>So the expected usage of this method is to be used not too often,
378      * accordingly to the traffic shaping configuration.</p>
379      *
380      * @param maxWriteDelay the maximum Write Delay in ms in the buffer allowed before write suspension is set.
381      *              Must be positive.
382      */
383     public void setMaxWriteDelay(long maxWriteDelay) {
384         if (maxWriteDelay <= 0) {
385             throw new IllegalArgumentException("maxWriteDelay must be positive");
386         }
387         this.maxWriteDelay = maxWriteDelay;
388     }
389 
390     /**
391      * @return the maxWriteSize default being {@value #DEFAULT_MAX_SIZE} bytes.
392      */
393     public long getMaxWriteSize() {
394         return maxWriteSize;
395     }
396 
397     /**
398      * <p>Note that this limit is a best effort on memory limitation to prevent Out Of
399      * Memory Exception. To ensure it works, the handler generating the write should
400      * use one of the way provided by Netty to handle the capacity:</p>
401      * <p>- the {@code Channel.isWritable()} property and the corresponding
402      * {@code channelWritabilityChanged()}</p>
403      * <p>- the {@code ChannelFuture.addListener(new GenericFutureListener())}</p>
404      *
405      * @param maxWriteSize the maximum Write Size allowed in the buffer
406      *            per channel before write suspended is set,
407      *            default being {@value #DEFAULT_MAX_SIZE} bytes.
408      */
409     public void setMaxWriteSize(long maxWriteSize) {
410         this.maxWriteSize = maxWriteSize;
411     }
412 
413     /**
414      * Called each time the accounting is computed from the TrafficCounters.
415      * This method could be used for instance to implement almost real time accounting.
416      *
417      * @param counter
418      *            the TrafficCounter that computes its performance
419      */
420     protected void doAccounting(TrafficCounter counter) {
421         // NOOP by default
422     }
423 
424     /**
425      * Class to implement setReadable at fix time
426      */
427     static final class ReopenReadTimerTask implements Runnable {
428         final ChannelHandlerContext ctx;
429         ReopenReadTimerTask(ChannelHandlerContext ctx) {
430             this.ctx = ctx;
431         }
432 
433         @Override
434         public void run() {
435             ChannelConfig config = ctx.channel().config();
436             if (!config.isAutoRead() && isHandlerActive(ctx)) {
437                 // If AutoRead is False and Active is True, user make a direct setAutoRead(false)
438                 // Then Just reset the status
439                 if (logger.isDebugEnabled()) {
440                     logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
441                             isHandlerActive(ctx));
442                 }
443                 ctx.attr(READ_SUSPENDED).set(false);
444             } else {
445                 // Anything else allows the handler to reset the AutoRead
446                 if (logger.isDebugEnabled()) {
447                     if (config.isAutoRead() && !isHandlerActive(ctx)) {
448                         logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
449                                 isHandlerActive(ctx));
450                     } else {
451                         logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
452                                 + isHandlerActive(ctx));
453                     }
454                 }
455                 ctx.attr(READ_SUSPENDED).set(false);
456                 config.setAutoRead(true);
457                 ctx.channel().read();
458             }
459             if (logger.isDebugEnabled()) {
460                 logger.debug("Unsupsend final status => " + config.isAutoRead() + ':'
461                         + isHandlerActive(ctx));
462             }
463         }
464     }
465 
466     /**
467      * Release the Read suspension
468      */
469     void releaseReadSuspended(ChannelHandlerContext ctx) {
470         ctx.attr(READ_SUSPENDED).set(false);
471         ctx.channel().config().setAutoRead(true);
472     }
473 
474     @Override
475     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
476         long size = calculateSize(msg);
477         long now = TrafficCounter.milliSecondFromNano();
478         if (size > 0) {
479             // compute the number of ms to wait before reopening the channel
480             long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
481             wait = checkWaitReadTime(ctx, wait, now);
482             if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
483                 // time in order to try to limit the traffic
484                 // Only AutoRead AND HandlerActive True means Context Active
485                 ChannelConfig config = ctx.channel().config();
486                 if (logger.isDebugEnabled()) {
487                     logger.debug("Read suspend: " + wait + ':' + config.isAutoRead() + ':'
488                             + isHandlerActive(ctx));
489                 }
490                 if (config.isAutoRead() && isHandlerActive(ctx)) {
491                     config.setAutoRead(false);
492                     ctx.attr(READ_SUSPENDED).set(true);
493                     // Create a Runnable to reactive the read if needed. If one was create before it will just be
494                     // reused to limit object creation
495                     Attribute<Runnable> attr = ctx.attr(REOPEN_TASK);
496                     Runnable reopenTask = attr.get();
497                     if (reopenTask == null) {
498                         reopenTask = new ReopenReadTimerTask(ctx);
499                         attr.set(reopenTask);
500                     }
501                     ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
502                     if (logger.isDebugEnabled()) {
503                         logger.debug("Suspend final status => " + config.isAutoRead() + ':'
504                                 + isHandlerActive(ctx) + " will reopened at: " + wait);
505                     }
506                 }
507             }
508         }
509         informReadOperation(ctx, now);
510         ctx.fireChannelRead(msg);
511     }
512 
513     /**
514      * Method overridden in GTSH to take into account specific timer for the channel.
515      * @param wait the wait delay computed in ms
516      * @param now the relative now time in ms
517      * @return the wait to use according to the context
518      */
519     long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
520         // no change by default
521         return wait;
522     }
523 
524     /**
525      * Method overridden in GTSH to take into account specific timer for the channel.
526      * @param now the relative now time in ms
527      */
528     void informReadOperation(final ChannelHandlerContext ctx, final long now) {
529         // default noop
530     }
531 
532     protected static boolean isHandlerActive(ChannelHandlerContext ctx) {
533         Boolean suspended = ctx.attr(READ_SUSPENDED).get();
534         return suspended == null || Boolean.FALSE.equals(suspended);
535     }
536 
537     @Override
538     public void read(ChannelHandlerContext ctx) {
539         if (isHandlerActive(ctx)) {
540             // For Global Traffic (and Read when using EventLoop in pipeline) : check if READ_SUSPENDED is False
541             ctx.read();
542         }
543     }
544 
545     @Override
546     public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
547             throws Exception {
548         long size = calculateSize(msg);
549         long now = TrafficCounter.milliSecondFromNano();
550         if (size > 0) {
551             // compute the number of ms to wait before continue with the channel
552             long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
553             if (wait >= MINIMAL_WAIT) {
554                 if (logger.isDebugEnabled()) {
555                     logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
556                             + isHandlerActive(ctx));
557                 }
558                 submitWrite(ctx, msg, size, wait, now, promise);
559                 return;
560             }
561         }
562         // to maintain order of write
563         submitWrite(ctx, msg, size, 0, now, promise);
564     }
565 
566     @Deprecated
567     protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
568             final long delay, final ChannelPromise promise) {
569         submitWrite(ctx, msg, calculateSize(msg),
570                 delay, TrafficCounter.milliSecondFromNano(), promise);
571     }
572 
573     abstract void submitWrite(
574             ChannelHandlerContext ctx, Object msg, long size, long delay, long now, ChannelPromise promise);
575 
576     @Override
577     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
578         setUserDefinedWritability(ctx, true);
579         super.channelRegistered(ctx);
580     }
581 
582     void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
583         ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
584         if (cob != null) {
585             cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable);
586         }
587     }
588 
589     /**
590      * Check the writability according to delay and size for the channel.
591      * Set if necessary setUserDefinedWritability status.
592      * @param delay the computed delay
593      * @param queueSize the current queueSize
594      */
595     void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
596         if (queueSize > maxWriteSize || delay > maxWriteDelay) {
597             setUserDefinedWritability(ctx, false);
598         }
599     }
600     /**
601      * Explicitly release the Write suspended status.
602      */
603     void releaseWriteSuspended(ChannelHandlerContext ctx) {
604         setUserDefinedWritability(ctx, true);
605     }
606 
607     /**
608      * @return the current TrafficCounter (if
609      *         channel is still connected)
610      */
611     public TrafficCounter trafficCounter() {
612         return trafficCounter;
613     }
614 
615     @Override
616     public String toString() {
617         StringBuilder builder = new StringBuilder(290)
618             .append("TrafficShaping with Write Limit: ").append(writeLimit)
619             .append(" Read Limit: ").append(readLimit)
620             .append(" CheckInterval: ").append(checkInterval)
621             .append(" maxDelay: ").append(maxWriteDelay)
622             .append(" maxSize: ").append(maxWriteSize)
623             .append(" and Counter: ");
624         if (trafficCounter != null) {
625             builder.append(trafficCounter);
626         } else {
627             builder.append("none");
628         }
629         return builder.toString();
630     }
631 
632     /**
633      * Calculate the size of the given {@link Object}.
634      *
635      * This implementation supports {@link ByteBuf} and {@link ByteBufHolder}. Sub-classes may override this.
636      *
637      * @param msg
638      *            the msg for which the size should be calculated.
639      * @return size the size of the msg or {@code -1} if unknown.
640      */
641     protected long calculateSize(Object msg) {
642         if (msg instanceof ByteBuf) {
643             return ((ByteBuf) msg).readableBytes();
644         }
645         if (msg instanceof ByteBufHolder) {
646             return ((ByteBufHolder) msg).content().readableBytes();
647         }
648         return -1;
649     }
650 }