View Javadoc
1   /*
2    * Copyright 2011 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.traffic;
17  
18  import static io.netty.util.internal.ObjectUtil.checkPositive;
19  
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.ByteBufHolder;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelDuplexHandler;
24  import io.netty.channel.ChannelConfig;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelOutboundBuffer;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.FileRegion;
29  import io.netty.util.Attribute;
30  import io.netty.util.AttributeKey;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.util.concurrent.TimeUnit;
35  
36  /**
37   * <p>AbstractTrafficShapingHandler allows to limit the global bandwidth
38   * (see {@link GlobalTrafficShapingHandler}) or per session
39   * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
40   * It allows you to implement an almost real time monitoring of the bandwidth using
41   * the monitors from {@link TrafficCounter} that will call back every checkInterval
42   * the method doAccounting of this handler.</p>
43   *
44   * <p>If you want for any particular reasons to stop the monitoring (accounting) or to change
45   * the read/write limit or the check interval, several methods allow that for you:</p>
46   * <ul>
47   * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
48   * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
49   * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
50   * </ul>
51   */
52  public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler {
53      private static final InternalLogger logger =
54              InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class);
55      /**
56       * Default delay between two checks: 1s
57       */
58      public static final long DEFAULT_CHECK_INTERVAL = 1000;
59  
60     /**
61      * Default max delay in case of traffic shaping
62      * (during which no communication will occur).
63      * Shall be less than TIMEOUT. Here half of "standard" 30s
64      */
65      public static final long DEFAULT_MAX_TIME = 15000;
66  
67      /**
68       * Default max size to not exceed in buffer (write only).
69       */
70      static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
71  
72      /**
73       * Default minimal time to wait: 10ms
74       */
75      static final long MINIMAL_WAIT = 10;
76  
77      /**
78       * Traffic Counter
79       */
80      protected TrafficCounter trafficCounter;
81  
82      /**
83       * Limit in B/s to apply to write
84       */
85      private volatile long writeLimit;
86  
87      /**
88       * Limit in B/s to apply to read
89       */
90      private volatile long readLimit;
91  
92      /**
93       * Max delay in wait
94       */
95      protected volatile long maxTime = DEFAULT_MAX_TIME; // default 15 s
96  
97      /**
98       * Delay between two performance snapshots
99       */
100     protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
101 
102     static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey
103             .valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED");
104     static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class
105             .getName() + ".REOPEN_TASK");
106 
107     /**
108      * Max time to delay before proposing to stop writing new objects from next handlers
109      */
110     volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL; // default 4 s
111     /**
112      * Max size in the list before proposing to stop writing new objects from next handlers
113      */
114     volatile long maxWriteSize = DEFAULT_MAX_SIZE; // default 4MB
115 
116     /**
117      * Rank in UserDefinedWritability (1 for Channel, 2 for Global TrafficShapingHandler).
118      * Set in final constructor. Must be between 1 and 31
119      */
120     final int userDefinedWritabilityIndex;
121 
122     /**
123      * Default value for Channel UserDefinedWritability index
124      */
125     static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
126 
127     /**
128      * Default value for Global UserDefinedWritability index
129      */
130     static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
131 
132     /**
133      * Default value for GlobalChannel UserDefinedWritability index
134      */
135     static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
136 
137     /**
138      * @param newTrafficCounter
139      *            the TrafficCounter to set
140      */
141     void setTrafficCounter(TrafficCounter newTrafficCounter) {
142         trafficCounter = newTrafficCounter;
143     }
144 
145     /**
146      * @return the index to be used by the TrafficShapingHandler to manage the user defined writability.
147      *              For Channel TSH it is defined as {@value #CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
148      *              for Global TSH it is defined as {@value #GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
149      *              for GlobalChannel TSH it is defined as
150      *              {@value #GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}.
151      */
152     protected int userDefinedWritabilityIndex() {
153         return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
154     }
155 
156     /**
157      * @param writeLimit
158      *          0 or a limit in bytes/s
159      * @param readLimit
160      *          0 or a limit in bytes/s
161      * @param checkInterval
162      *            The delay between two computations of performances for
163      *            channels or 0 if no stats are to be computed.
164      * @param maxTime
165      *            The maximum delay to wait in case of traffic excess.
166      *            Must be positive.
167      */
168     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
169         this.maxTime = checkPositive(maxTime, "maxTime");
170 
171         userDefinedWritabilityIndex = userDefinedWritabilityIndex();
172         this.writeLimit = writeLimit;
173         this.readLimit = readLimit;
174         this.checkInterval = checkInterval;
175     }
176 
177     /**
178      * Constructor using default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
179      * @param writeLimit
180      *            0 or a limit in bytes/s
181      * @param readLimit
182      *            0 or a limit in bytes/s
183      * @param checkInterval
184      *            The delay between two computations of performances for
185      *            channels or 0 if no stats are to be computed.
186      */
187     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) {
188         this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
189     }
190 
191     /**
192      * Constructor using default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
193      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
194      *
195      * @param writeLimit
196      *          0 or a limit in bytes/s
197      * @param readLimit
198      *          0 or a limit in bytes/s
199      */
200     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) {
201         this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
202     }
203 
204     /**
205      * Constructor using NO LIMIT, default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
206      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
207      */
208     protected AbstractTrafficShapingHandler() {
209         this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
210     }
211 
212     /**
213      * Constructor using NO LIMIT and
214      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
215      *
216      * @param checkInterval
217      *            The delay between two computations of performances for
218      *            channels or 0 if no stats are to be computed.
219      */
220     protected AbstractTrafficShapingHandler(long checkInterval) {
221         this(0, 0, checkInterval, DEFAULT_MAX_TIME);
222     }
223 
224     /**
225      * Change the underlying limitations and check interval.
226      * <p>Note the change will be taken as best effort, meaning
227      * that all already scheduled traffics will not be
228      * changed, but only applied to new traffics.</p>
229      * <p>So the expected usage of this method is to be used not too often,
230      * accordingly to the traffic shaping configuration.</p>
231      *
232      * @param newWriteLimit The new write limit (in bytes)
233      * @param newReadLimit The new read limit (in bytes)
234      * @param newCheckInterval The new check interval (in milliseconds)
235      */
236     public void configure(long newWriteLimit, long newReadLimit,
237             long newCheckInterval) {
238         configure(newWriteLimit, newReadLimit);
239         configure(newCheckInterval);
240     }
241 
242     /**
243      * Change the underlying limitations.
244      * <p>Note the change will be taken as best effort, meaning
245      * that all already scheduled traffics will not be
246      * changed, but only applied to new traffics.</p>
247      * <p>So the expected usage of this method is to be used not too often,
248      * accordingly to the traffic shaping configuration.</p>
249      *
250      * @param newWriteLimit The new write limit (in bytes)
251      * @param newReadLimit The new read limit (in bytes)
252      */
253     public void configure(long newWriteLimit, long newReadLimit) {
254         writeLimit = newWriteLimit;
255         readLimit = newReadLimit;
256         if (trafficCounter != null) {
257             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
258         }
259     }
260 
261     /**
262      * Change the check interval.
263      *
264      * @param newCheckInterval The new check interval (in milliseconds)
265      */
266     public void configure(long newCheckInterval) {
267         checkInterval = newCheckInterval;
268         if (trafficCounter != null) {
269             trafficCounter.configure(checkInterval);
270         }
271     }
272 
273     /**
274      * @return the writeLimit
275      */
276     public long getWriteLimit() {
277         return writeLimit;
278     }
279 
280     /**
281      * <p>Note the change will be taken as best effort, meaning
282      * that all already scheduled traffics will not be
283      * changed, but only applied to new traffics.</p>
284      * <p>So the expected usage of this method is to be used not too often,
285      * accordingly to the traffic shaping configuration.</p>
286      *
287      * @param writeLimit the writeLimit to set
288      */
289     public void setWriteLimit(long writeLimit) {
290         this.writeLimit = writeLimit;
291         if (trafficCounter != null) {
292             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
293         }
294     }
295 
296     /**
297      * @return the readLimit
298      */
299     public long getReadLimit() {
300         return readLimit;
301     }
302 
303     /**
304      * <p>Note the change will be taken as best effort, meaning
305      * that all already scheduled traffics will not be
306      * changed, but only applied to new traffics.</p>
307      * <p>So the expected usage of this method is to be used not too often,
308      * accordingly to the traffic shaping configuration.</p>
309      *
310      * @param readLimit the readLimit to set
311      */
312     public void setReadLimit(long readLimit) {
313         this.readLimit = readLimit;
314         if (trafficCounter != null) {
315             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
316         }
317     }
318 
319     /**
320      * @return the checkInterval
321      */
322     public long getCheckInterval() {
323         return checkInterval;
324     }
325 
326     /**
327      * @param checkInterval the interval in ms between each step check to set, default value being 1000 ms.
328      */
329     public void setCheckInterval(long checkInterval) {
330         this.checkInterval = checkInterval;
331         if (trafficCounter != null) {
332             trafficCounter.configure(checkInterval);
333         }
334     }
335 
336     /**
337      * <p>Note the change will be taken as best effort, meaning
338      * that all already scheduled traffics will not be
339      * changed, but only applied to new traffics.</p>
340      * <p>So the expected usage of this method is to be used not too often,
341      * accordingly to the traffic shaping configuration.</p>
342      *
343      * @param maxTime
344      *            Max delay in wait, shall be less than TIME OUT in related protocol.
345      *            Must be positive.
346      */
347     public void setMaxTimeWait(long maxTime) {
348         this.maxTime = checkPositive(maxTime, "maxTime");
349     }
350 
351     /**
352      * @return the max delay in wait to prevent TIME OUT
353      */
354     public long getMaxTimeWait() {
355         return maxTime;
356     }
357 
358     /**
359      * @return the maxWriteDelay
360      */
361     public long getMaxWriteDelay() {
362         return maxWriteDelay;
363     }
364 
365     /**
366      * <p>Note the change will be taken as best effort, meaning
367      * that all already scheduled traffics will not be
368      * changed, but only applied to new traffics.</p>
369      * <p>So the expected usage of this method is to be used not too often,
370      * accordingly to the traffic shaping configuration.</p>
371      *
372      * @param maxWriteDelay the maximum Write Delay in ms in the buffer allowed before write suspension is set.
373      *              Must be positive.
374      */
375     public void setMaxWriteDelay(long maxWriteDelay) {
376         this.maxWriteDelay = checkPositive(maxWriteDelay, "maxWriteDelay");
377     }
378 
379     /**
380      * @return the maxWriteSize default being {@value #DEFAULT_MAX_SIZE} bytes.
381      */
382     public long getMaxWriteSize() {
383         return maxWriteSize;
384     }
385 
386     /**
387      * <p>Note that this limit is a best effort on memory limitation to prevent Out Of
388      * Memory Exception. To ensure it works, the handler generating the write should
389      * use one of the way provided by Netty to handle the capacity:</p>
390      * <p>- the {@code Channel.isWritable()} property and the corresponding
391      * {@code channelWritabilityChanged()}</p>
392      * <p>- the {@code ChannelFuture.addListener(new GenericFutureListener())}</p>
393      *
394      * @param maxWriteSize the maximum Write Size allowed in the buffer
395      *            per channel before write suspended is set,
396      *            default being {@value #DEFAULT_MAX_SIZE} bytes.
397      */
398     public void setMaxWriteSize(long maxWriteSize) {
399         this.maxWriteSize = maxWriteSize;
400     }
401 
402     /**
403      * Called each time the accounting is computed from the TrafficCounters.
404      * This method could be used for instance to implement almost real time accounting.
405      *
406      * @param counter
407      *            the TrafficCounter that computes its performance
408      */
409     protected void doAccounting(TrafficCounter counter) {
410         // NOOP by default
411     }
412 
413     /**
414      * Class to implement setReadable at fix time
415      */
416     static final class ReopenReadTimerTask implements Runnable {
417         final ChannelHandlerContext ctx;
418         ReopenReadTimerTask(ChannelHandlerContext ctx) {
419             this.ctx = ctx;
420         }
421 
422         @Override
423         public void run() {
424             Channel channel = ctx.channel();
425             ChannelConfig config = channel.config();
426             if (!config.isAutoRead() && isHandlerActive(ctx)) {
427                 // If AutoRead is False and Active is True, user make a direct setAutoRead(false)
428                 // Then Just reset the status
429                 if (logger.isDebugEnabled()) {
430                     logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
431                             isHandlerActive(ctx));
432                 }
433                 channel.attr(READ_SUSPENDED).set(false);
434             } else {
435                 // Anything else allows the handler to reset the AutoRead
436                 if (logger.isDebugEnabled()) {
437                     if (config.isAutoRead() && !isHandlerActive(ctx)) {
438                         if (logger.isDebugEnabled()) {
439                             logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
440                                     isHandlerActive(ctx));
441                         }
442                     } else {
443                         if (logger.isDebugEnabled()) {
444                             logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
445                                     + isHandlerActive(ctx));
446                         }
447                     }
448                 }
449                 channel.attr(READ_SUSPENDED).set(false);
450                 config.setAutoRead(true);
451                 channel.read();
452             }
453             if (logger.isDebugEnabled()) {
454                 logger.debug("Unsuspend final status => " + config.isAutoRead() + ':'
455                         + isHandlerActive(ctx));
456             }
457         }
458     }
459 
460     /**
461      * Release the Read suspension
462      */
463     void releaseReadSuspended(ChannelHandlerContext ctx) {
464         Channel channel = ctx.channel();
465         channel.attr(READ_SUSPENDED).set(false);
466         channel.config().setAutoRead(true);
467     }
468 
469     @Override
470     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
471         long size = calculateSize(msg);
472         long now = TrafficCounter.milliSecondFromNano();
473         if (size > 0) {
474             // compute the number of ms to wait before reopening the channel
475             long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
476             wait = checkWaitReadTime(ctx, wait, now);
477             if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
478                 // time in order to try to limit the traffic
479                 // Only AutoRead AND HandlerActive True means Context Active
480                 Channel channel = ctx.channel();
481                 ChannelConfig config = channel.config();
482                 if (logger.isDebugEnabled()) {
483                     logger.debug("Read suspend: " + wait + ':' + config.isAutoRead() + ':'
484                             + isHandlerActive(ctx));
485                 }
486                 if (config.isAutoRead() && isHandlerActive(ctx)) {
487                     config.setAutoRead(false);
488                     channel.attr(READ_SUSPENDED).set(true);
489                     // Create a Runnable to reactive the read if needed. If one was create before it will just be
490                     // reused to limit object creation
491                     Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
492                     Runnable reopenTask = attr.get();
493                     if (reopenTask == null) {
494                         reopenTask = new ReopenReadTimerTask(ctx);
495                         attr.set(reopenTask);
496                     }
497                     ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
498                     if (logger.isDebugEnabled()) {
499                         logger.debug("Suspend final status => " + config.isAutoRead() + ':'
500                                 + isHandlerActive(ctx) + " will reopened at: " + wait);
501                     }
502                 }
503             }
504         }
505         informReadOperation(ctx, now);
506         ctx.fireChannelRead(msg);
507     }
508 
509     @Override
510     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
511         Channel channel = ctx.channel();
512         if (channel.hasAttr(REOPEN_TASK)) {
513             //release the reopen task
514             channel.attr(REOPEN_TASK).set(null);
515         }
516         super.handlerRemoved(ctx);
517     }
518 
519     /**
520      * Method overridden in GTSH to take into account specific timer for the channel.
521      * @param wait the wait delay computed in ms
522      * @param now the relative now time in ms
523      * @return the wait to use according to the context
524      */
525     long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
526         // no change by default
527         return wait;
528     }
529 
530     /**
531      * Method overridden in GTSH to take into account specific timer for the channel.
532      * @param now the relative now time in ms
533      */
534     void informReadOperation(final ChannelHandlerContext ctx, final long now) {
535         // default noop
536     }
537 
538     protected static boolean isHandlerActive(ChannelHandlerContext ctx) {
539         Boolean suspended = ctx.channel().attr(READ_SUSPENDED).get();
540         return suspended == null || Boolean.FALSE.equals(suspended);
541     }
542 
543     @Override
544     public void read(ChannelHandlerContext ctx) {
545         if (isHandlerActive(ctx)) {
546             // For Global Traffic (and Read when using EventLoop in pipeline) : check if READ_SUSPENDED is False
547             ctx.read();
548         }
549     }
550 
551     @Override
552     public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
553             throws Exception {
554         long size = calculateSize(msg);
555         long now = TrafficCounter.milliSecondFromNano();
556         if (size > 0) {
557             // compute the number of ms to wait before continue with the channel
558             long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
559             if (wait >= MINIMAL_WAIT) {
560                 if (logger.isDebugEnabled()) {
561                     logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
562                             + isHandlerActive(ctx));
563                 }
564                 submitWrite(ctx, msg, size, wait, now, promise);
565                 return;
566             }
567         }
568         // to maintain order of write
569         submitWrite(ctx, msg, size, 0, now, promise);
570     }
571 
572     @Deprecated
573     protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
574             final long delay, final ChannelPromise promise) {
575         submitWrite(ctx, msg, calculateSize(msg),
576                 delay, TrafficCounter.milliSecondFromNano(), promise);
577     }
578 
579     abstract void submitWrite(
580             ChannelHandlerContext ctx, Object msg, long size, long delay, long now, ChannelPromise promise);
581 
582     @Override
583     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
584         setUserDefinedWritability(ctx, true);
585         super.channelRegistered(ctx);
586     }
587 
588     void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
589         ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
590         if (cob != null) {
591             cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable);
592         }
593     }
594 
595     /**
596      * Check the writability according to delay and size for the channel.
597      * Set if necessary setUserDefinedWritability status.
598      * @param delay the computed delay
599      * @param queueSize the current queueSize
600      */
601     void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
602         if (queueSize > maxWriteSize || delay > maxWriteDelay) {
603             setUserDefinedWritability(ctx, false);
604         }
605     }
606     /**
607      * Explicitly release the Write suspended status.
608      */
609     void releaseWriteSuspended(ChannelHandlerContext ctx) {
610         setUserDefinedWritability(ctx, true);
611     }
612 
613     /**
614      * @return the current TrafficCounter (if
615      *         channel is still connected)
616      */
617     public TrafficCounter trafficCounter() {
618         return trafficCounter;
619     }
620 
621     @Override
622     public String toString() {
623         StringBuilder builder = new StringBuilder(290)
624             .append("TrafficShaping with Write Limit: ").append(writeLimit)
625             .append(" Read Limit: ").append(readLimit)
626             .append(" CheckInterval: ").append(checkInterval)
627             .append(" maxDelay: ").append(maxWriteDelay)
628             .append(" maxSize: ").append(maxWriteSize)
629             .append(" and Counter: ");
630         if (trafficCounter != null) {
631             builder.append(trafficCounter);
632         } else {
633             builder.append("none");
634         }
635         return builder.toString();
636     }
637 
638     /**
639      * Calculate the size of the given {@link Object}.
640      *
641      * This implementation supports {@link ByteBuf}, {@link ByteBufHolder} and {@link FileRegion}.
642      * Sub-classes may override this.
643      * @param msg the msg for which the size should be calculated.
644      * @return size the size of the msg or {@code -1} if unknown.
645      */
646     protected long calculateSize(Object msg) {
647         if (msg instanceof ByteBuf) {
648             return ((ByteBuf) msg).readableBytes();
649         }
650         if (msg instanceof ByteBufHolder) {
651             return ((ByteBufHolder) msg).content().readableBytes();
652         }
653         if (msg instanceof FileRegion) {
654             return ((FileRegion) msg).count();
655         }
656         return -1;
657     }
658 }