View Javadoc
1   /*
2    * Copyright 2011 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.traffic;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.channel.ChannelDuplexHandler;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.util.Attribute;
26  import io.netty.util.AttributeKey;
27  import io.netty.util.internal.logging.InternalLogger;
28  import io.netty.util.internal.logging.InternalLoggerFactory;
29  
30  import java.util.concurrent.TimeUnit;
31  
32  /**
33   * <p>AbstractTrafficShapingHandler allows to limit the global bandwidth
34   * (see {@link GlobalTrafficShapingHandler}) or per session
35   * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
36   * It allows you to implement an almost real time monitoring of the bandwidth using
37   * the monitors from {@link TrafficCounter} that will call back every checkInterval
38   * the method doAccounting of this handler.</p>
39   *
40   * <p>If you want for any particular reasons to stop the monitoring (accounting) or to change
41   * the read/write limit or the check interval, several methods allow that for you:</p>
42   * <ul>
43   * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
44   * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
45   * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
46   * </ul>
47   */
48  public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler {
49      private static final InternalLogger logger =
50              InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class);
51      /**
52       * Default delay between two checks: 1s
53       */
54      public static final long DEFAULT_CHECK_INTERVAL = 1000;
55  
56     /**
57      * Default max delay in case of traffic shaping
58      * (during which no communication will occur).
59      * Shall be less than TIMEOUT. Here half of "standard" 30s
60      */
61      public static final long DEFAULT_MAX_TIME = 15000;
62  
63      /**
64       * Default max size to not exceed in buffer (write only).
65       */
66      static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
67  
68      /**
69       * Default minimal time to wait
70       */
71      static final long MINIMAL_WAIT = 10;
72  
73      /**
74       * Traffic Counter
75       */
76      protected TrafficCounter trafficCounter;
77  
78      /**
79       * Limit in B/s to apply to write
80       */
81      private volatile long writeLimit;
82  
83      /**
84       * Limit in B/s to apply to read
85       */
86      private volatile long readLimit;
87  
88      /**
89       * Max delay in wait
90       */
91      protected volatile long maxTime = DEFAULT_MAX_TIME; // default 15 s
92  
93      /**
94       * Delay between two performance snapshots
95       */
96      protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
97  
98      static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey
99              .valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED");
100     static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class
101             .getName() + ".REOPEN_TASK");
102 
103     /**
104      * Max time to delay before proposing to stop writing new objects from next handlers
105      */
106     volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL; // default 4 s
107     /**
108      * Max size in the list before proposing to stop writing new objects from next handlers
109      */
110     volatile long maxWriteSize = DEFAULT_MAX_SIZE; // default 4MB
111 
112     /**
113      * Rank in UserDefinedWritability (1 for Channel, 2 for Global TrafficShapingHandler).
114      * Set in final constructor. Must be between 1 and 31
115      */
116     final int userDefinedWritabilityIndex;
117 
118     /**
119      * Default value for Channel UserDefinedWritability index
120      */
121     static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
122 
123     /**
124      * Default value for Global UserDefinedWritability index
125      */
126     static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
127 
128     /**
129      * Default value for GlobalChannel UserDefinedWritability index
130      */
131     static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
132 
133     /**
134      * @param newTrafficCounter
135      *            the TrafficCounter to set
136      */
137     void setTrafficCounter(TrafficCounter newTrafficCounter) {
138         trafficCounter = newTrafficCounter;
139     }
140 
141     /**
142      * @return the index to be used by the TrafficShapingHandler to manage the user defined writability.
143      *              For Channel TSH it is defined as {@value #CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
144      *              for Global TSH it is defined as {@value #GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
145      *              for GlobalChannel TSH it is defined as
146      *              {@value #GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}.
147      */
148     protected int userDefinedWritabilityIndex() {
149         return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
150     }
151 
152     /**
153      * @param writeLimit
154      *          0 or a limit in bytes/s
155      * @param readLimit
156      *          0 or a limit in bytes/s
157      * @param checkInterval
158      *            The delay between two computations of performances for
159      *            channels or 0 if no stats are to be computed.
160      * @param maxTime
161      *            The maximum delay to wait in case of traffic excess.
162      *            Must be positive.
163      */
164     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
165         if (maxTime <= 0) {
166             throw new IllegalArgumentException("maxTime must be positive");
167         }
168 
169         userDefinedWritabilityIndex = userDefinedWritabilityIndex();
170         this.writeLimit = writeLimit;
171         this.readLimit = readLimit;
172         this.checkInterval = checkInterval;
173         this.maxTime = maxTime;
174     }
175 
176     /**
177      * Constructor using default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
178      * @param writeLimit
179      *            0 or a limit in bytes/s
180      * @param readLimit
181      *            0 or a limit in bytes/s
182      * @param checkInterval
183      *            The delay between two computations of performances for
184      *            channels or 0 if no stats are to be computed.
185      */
186     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) {
187         this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
188     }
189 
190     /**
191      * Constructor using default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
192      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
193      *
194      * @param writeLimit
195      *          0 or a limit in bytes/s
196      * @param readLimit
197      *          0 or a limit in bytes/s
198      */
199     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) {
200         this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
201     }
202 
203     /**
204      * Constructor using NO LIMIT, default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
205      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
206      */
207     protected AbstractTrafficShapingHandler() {
208         this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
209     }
210 
211     /**
212      * Constructor using NO LIMIT and
213      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
214      *
215      * @param checkInterval
216      *            The delay between two computations of performances for
217      *            channels or 0 if no stats are to be computed.
218      */
219     protected AbstractTrafficShapingHandler(long checkInterval) {
220         this(0, 0, checkInterval, DEFAULT_MAX_TIME);
221     }
222 
223     /**
224      * Change the underlying limitations and check interval.
225      * <p>Note the change will be taken as best effort, meaning
226      * that all already scheduled traffics will not be
227      * changed, but only applied to new traffics.</p>
228      * <p>So the expected usage of this method is to be used not too often,
229      * accordingly to the traffic shaping configuration.</p>
230      *
231      * @param newWriteLimit The new write limit (in bytes)
232      * @param newReadLimit The new read limit (in bytes)
233      * @param newCheckInterval The new check interval (in milliseconds)
234      */
235     public void configure(long newWriteLimit, long newReadLimit,
236             long newCheckInterval) {
237         configure(newWriteLimit, newReadLimit);
238         configure(newCheckInterval);
239     }
240 
241     /**
242      * Change the underlying limitations.
243      * <p>Note the change will be taken as best effort, meaning
244      * that all already scheduled traffics will not be
245      * changed, but only applied to new traffics.</p>
246      * <p>So the expected usage of this method is to be used not too often,
247      * accordingly to the traffic shaping configuration.</p>
248      *
249      * @param newWriteLimit The new write limit (in bytes)
250      * @param newReadLimit The new read limit (in bytes)
251      */
252     public void configure(long newWriteLimit, long newReadLimit) {
253         writeLimit = newWriteLimit;
254         readLimit = newReadLimit;
255         if (trafficCounter != null) {
256             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
257         }
258     }
259 
260     /**
261      * Change the check interval.
262      *
263      * @param newCheckInterval The new check interval (in milliseconds)
264      */
265     public void configure(long newCheckInterval) {
266         checkInterval = newCheckInterval;
267         if (trafficCounter != null) {
268             trafficCounter.configure(checkInterval);
269         }
270     }
271 
272     /**
273      * @return the writeLimit
274      */
275     public long getWriteLimit() {
276         return writeLimit;
277     }
278 
279     /**
280      * <p>Note the change will be taken as best effort, meaning
281      * that all already scheduled traffics will not be
282      * changed, but only applied to new traffics.</p>
283      * <p>So the expected usage of this method is to be used not too often,
284      * accordingly to the traffic shaping configuration.</p>
285      *
286      * @param writeLimit the writeLimit to set
287      */
288     public void setWriteLimit(long writeLimit) {
289         this.writeLimit = writeLimit;
290         if (trafficCounter != null) {
291             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
292         }
293     }
294 
295     /**
296      * @return the readLimit
297      */
298     public long getReadLimit() {
299         return readLimit;
300     }
301 
302     /**
303      * <p>Note the change will be taken as best effort, meaning
304      * that all already scheduled traffics will not be
305      * changed, but only applied to new traffics.</p>
306      * <p>So the expected usage of this method is to be used not too often,
307      * accordingly to the traffic shaping configuration.</p>
308      *
309      * @param readLimit the readLimit to set
310      */
311     public void setReadLimit(long readLimit) {
312         this.readLimit = readLimit;
313         if (trafficCounter != null) {
314             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
315         }
316     }
317 
318     /**
319      * @return the checkInterval
320      */
321     public long getCheckInterval() {
322         return checkInterval;
323     }
324 
325     /**
326      * @param checkInterval the interval in ms between each step check to set, default value being 1000 ms.
327      */
328     public void setCheckInterval(long checkInterval) {
329         this.checkInterval = checkInterval;
330         if (trafficCounter != null) {
331             trafficCounter.configure(checkInterval);
332         }
333     }
334 
335     /**
336      * <p>Note the change will be taken as best effort, meaning
337      * that all already scheduled traffics will not be
338      * changed, but only applied to new traffics.</p>
339      * <p>So the expected usage of this method is to be used not too often,
340      * accordingly to the traffic shaping configuration.</p>
341      *
342      * @param maxTime
343      *            Max delay in wait, shall be less than TIME OUT in related protocol.
344      *            Must be positive.
345      */
346     public void setMaxTimeWait(long maxTime) {
347         if (maxTime <= 0) {
348             throw new IllegalArgumentException("maxTime must be positive");
349         }
350         this.maxTime = maxTime;
351     }
352 
353     /**
354      * @return the max delay in wait to prevent TIME OUT
355      */
356     public long getMaxTimeWait() {
357         return maxTime;
358     }
359 
360     /**
361      * @return the maxWriteDelay
362      */
363     public long getMaxWriteDelay() {
364         return maxWriteDelay;
365     }
366 
367     /**
368      * <p>Note the change will be taken as best effort, meaning
369      * that all already scheduled traffics will not be
370      * changed, but only applied to new traffics.</p>
371      * <p>So the expected usage of this method is to be used not too often,
372      * accordingly to the traffic shaping configuration.</p>
373      *
374      * @param maxWriteDelay the maximum Write Delay in ms in the buffer allowed before write suspension is set.
375      *              Must be positive.
376      */
377     public void setMaxWriteDelay(long maxWriteDelay) {
378         if (maxWriteDelay <= 0) {
379             throw new IllegalArgumentException("maxWriteDelay must be positive");
380         }
381         this.maxWriteDelay = maxWriteDelay;
382     }
383 
384     /**
385      * @return the maxWriteSize default being {@value #DEFAULT_MAX_SIZE} bytes.
386      */
387     public long getMaxWriteSize() {
388         return maxWriteSize;
389     }
390 
391     /**
392      * <p>Note that this limit is a best effort on memory limitation to prevent Out Of
393      * Memory Exception. To ensure it works, the handler generating the write should
394      * use one of the way provided by Netty to handle the capacity:</p>
395      * <p>- the {@code Channel.isWritable()} property and the corresponding
396      * {@code channelWritabilityChanged()}</p>
397      * <p>- the {@code ChannelFuture.addListener(new GenericFutureListener())}</p>
398      *
399      * @param maxWriteSize the maximum Write Size allowed in the buffer
400      *            per channel before write suspended is set,
401      *            default being {@value #DEFAULT_MAX_SIZE} bytes.
402      */
403     public void setMaxWriteSize(long maxWriteSize) {
404         this.maxWriteSize = maxWriteSize;
405     }
406 
407     /**
408      * Called each time the accounting is computed from the TrafficCounters.
409      * This method could be used for instance to implement almost real time accounting.
410      *
411      * @param counter
412      *            the TrafficCounter that computes its performance
413      */
414     protected void doAccounting(TrafficCounter counter) {
415         // NOOP by default
416     }
417 
418     /**
419      * Class to implement setReadable at fix time
420      */
421     static final class ReopenReadTimerTask implements Runnable {
422         final ChannelHandlerContext ctx;
423         ReopenReadTimerTask(ChannelHandlerContext ctx) {
424             this.ctx = ctx;
425         }
426 
427         @Override
428         public void run() {
429             ChannelConfig config = ctx.channel().config();
430             if (!config.isAutoRead() && isHandlerActive(ctx)) {
431                 // If AutoRead is False and Active is True, user make a direct setAutoRead(false)
432                 // Then Just reset the status
433                 if (logger.isDebugEnabled()) {
434                     logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
435                             isHandlerActive(ctx));
436                 }
437                 ctx.attr(READ_SUSPENDED).set(false);
438             } else {
439                 // Anything else allows the handler to reset the AutoRead
440                 if (logger.isDebugEnabled()) {
441                     if (config.isAutoRead() && !isHandlerActive(ctx)) {
442                         logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
443                                 isHandlerActive(ctx));
444                     } else {
445                         logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
446                                 + isHandlerActive(ctx));
447                     }
448                 }
449                 ctx.attr(READ_SUSPENDED).set(false);
450                 config.setAutoRead(true);
451                 ctx.channel().read();
452             }
453             if (logger.isDebugEnabled()) {
454                 logger.debug("Unsuspend final status => " + config.isAutoRead() + ':'
455                         + isHandlerActive(ctx));
456             }
457         }
458     }
459 
460     /**
461      * Release the Read suspension
462      */
463     void releaseReadSuspended(ChannelHandlerContext ctx) {
464         ctx.attr(READ_SUSPENDED).set(false);
465         ctx.channel().config().setAutoRead(true);
466     }
467 
468     @Override
469     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
470         long size = calculateSize(msg);
471         long now = TrafficCounter.milliSecondFromNano();
472         if (size > 0) {
473             // compute the number of ms to wait before reopening the channel
474             long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
475             wait = checkWaitReadTime(ctx, wait, now);
476             if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
477                 // time in order to try to limit the traffic
478                 // Only AutoRead AND HandlerActive True means Context Active
479                 ChannelConfig config = ctx.channel().config();
480                 if (logger.isDebugEnabled()) {
481                     logger.debug("Read suspend: " + wait + ':' + config.isAutoRead() + ':'
482                             + isHandlerActive(ctx));
483                 }
484                 if (config.isAutoRead() && isHandlerActive(ctx)) {
485                     config.setAutoRead(false);
486                     ctx.attr(READ_SUSPENDED).set(true);
487                     // Create a Runnable to reactive the read if needed. If one was create before it will just be
488                     // reused to limit object creation
489                     Attribute<Runnable> attr = ctx.attr(REOPEN_TASK);
490                     Runnable reopenTask = attr.get();
491                     if (reopenTask == null) {
492                         reopenTask = new ReopenReadTimerTask(ctx);
493                         attr.set(reopenTask);
494                     }
495                     ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
496                     if (logger.isDebugEnabled()) {
497                         logger.debug("Suspend final status => " + config.isAutoRead() + ':'
498                                 + isHandlerActive(ctx) + " will reopened at: " + wait);
499                     }
500                 }
501             }
502         }
503         informReadOperation(ctx, now);
504         ctx.fireChannelRead(msg);
505     }
506 
507     /**
508      * Method overridden in GTSH to take into account specific timer for the channel.
509      * @param wait the wait delay computed in ms
510      * @param now the relative now time in ms
511      * @return the wait to use according to the context
512      */
513     long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
514         // no change by default
515         return wait;
516     }
517 
518     /**
519      * Method overridden in GTSH to take into account specific timer for the channel.
520      * @param now the relative now time in ms
521      */
522     void informReadOperation(final ChannelHandlerContext ctx, final long now) {
523         // default noop
524     }
525 
526     protected static boolean isHandlerActive(ChannelHandlerContext ctx) {
527         Boolean suspended = ctx.attr(READ_SUSPENDED).get();
528         return suspended == null || Boolean.FALSE.equals(suspended);
529     }
530 
531     @Override
532     public void read(ChannelHandlerContext ctx) {
533         if (isHandlerActive(ctx)) {
534             // For Global Traffic (and Read when using EventLoop in pipeline) : check if READ_SUSPENDED is False
535             ctx.read();
536         }
537     }
538 
539     @Override
540     public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
541             throws Exception {
542         long size = calculateSize(msg);
543         long now = TrafficCounter.milliSecondFromNano();
544         if (size > 0) {
545             // compute the number of ms to wait before continue with the channel
546             long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
547             if (wait >= MINIMAL_WAIT) {
548                 if (logger.isDebugEnabled()) {
549                     logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
550                             + isHandlerActive(ctx));
551                 }
552                 submitWrite(ctx, msg, size, wait, now, promise);
553                 return;
554             }
555         }
556         // to maintain order of write
557         submitWrite(ctx, msg, size, 0, now, promise);
558     }
559 
560     @Deprecated
561     protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
562             final long delay, final ChannelPromise promise) {
563         submitWrite(ctx, msg, calculateSize(msg),
564                 delay, TrafficCounter.milliSecondFromNano(), promise);
565     }
566 
567     abstract void submitWrite(
568             ChannelHandlerContext ctx, Object msg, long size, long delay, long now, ChannelPromise promise);
569 
570     @Override
571     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
572         setUserDefinedWritability(ctx, true);
573         super.channelRegistered(ctx);
574     }
575 
576     void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
577         ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
578         if (cob != null) {
579             cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable);
580         }
581     }
582 
583     /**
584      * Check the writability according to delay and size for the channel.
585      * Set if necessary setUserDefinedWritability status.
586      * @param delay the computed delay
587      * @param queueSize the current queueSize
588      */
589     void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
590         if (queueSize > maxWriteSize || delay > maxWriteDelay) {
591             setUserDefinedWritability(ctx, false);
592         }
593     }
594     /**
595      * Explicitly release the Write suspended status.
596      */
597     void releaseWriteSuspended(ChannelHandlerContext ctx) {
598         setUserDefinedWritability(ctx, true);
599     }
600 
601     /**
602      * @return the current TrafficCounter (if
603      *         channel is still connected)
604      */
605     public TrafficCounter trafficCounter() {
606         return trafficCounter;
607     }
608 
609     @Override
610     public String toString() {
611         StringBuilder builder = new StringBuilder(290)
612             .append("TrafficShaping with Write Limit: ").append(writeLimit)
613             .append(" Read Limit: ").append(readLimit)
614             .append(" CheckInterval: ").append(checkInterval)
615             .append(" maxDelay: ").append(maxWriteDelay)
616             .append(" maxSize: ").append(maxWriteSize)
617             .append(" and Counter: ");
618         if (trafficCounter != null) {
619             builder.append(trafficCounter);
620         } else {
621             builder.append("none");
622         }
623         return builder.toString();
624     }
625 
626     /**
627      * Calculate the size of the given {@link Object}.
628      *
629      * This implementation supports {@link ByteBuf} and {@link ByteBufHolder}. Sub-classes may override this.
630      * @param msg the msg for which the size should be calculated.
631      * @return size the size of the msg or {@code -1} if unknown.
632      */
633     protected long calculateSize(Object msg) {
634         if (msg instanceof ByteBuf) {
635             return ((ByteBuf) msg).readableBytes();
636         }
637         if (msg instanceof ByteBufHolder) {
638             return ((ByteBufHolder) msg).content().readableBytes();
639         }
640         return -1;
641     }
642 }