View Javadoc

1   /*
2    * Copyright 2011 The Netty Project
3    * The Netty Project licenses this file to you under the Apache License,
4    * version 2.0 (the "License"); you may not use this file except in compliance
5    * with the License. You may obtain a copy of the License at:
6    * http://www.apache.org/licenses/LICENSE-2.0
7    * Unless required by applicable law or agreed to in writing, software
8    * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9    * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10   * License for the specific language governing permissions and limitations
11   * under the License.
12   */
13  package io.netty.handler.traffic;
14  
15  import io.netty.buffer.ByteBuf;
16  import io.netty.buffer.ByteBufHolder;
17  import io.netty.channel.ChannelDuplexHandler;
18  import io.netty.channel.ChannelConfig;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.ChannelPromise;
22  import io.netty.util.Attribute;
23  import io.netty.util.AttributeKey;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  
27  import java.util.concurrent.TimeUnit;
28  
29  /**
30   * <p>AbstractTrafficShapingHandler allows to limit the global bandwidth
31   * (see {@link GlobalTrafficShapingHandler}) or per session
32   * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
33   * It allows you to implement an almost real time monitoring of the bandwidth using
34   * the monitors from {@link TrafficCounter} that will call back every checkInterval
35   * the method doAccounting of this handler.</p>
36   *
37   * <p>If you want for any particular reasons to stop the monitoring (accounting) or to change
38   * the read/write limit or the check interval, several methods allow that for you:</p>
39   * <ul>
40   * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
41   * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop or start the
42   * monitoring, to change the checkInterval directly, or to have access to its values.</li>
43   * </ul>
44   */
45  public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler {
46      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class);
47  
48      /**
49       * Default delay between two checks: 1s
50       */
51      public static final long DEFAULT_CHECK_INTERVAL = 1000;
52  
53      /**
54       * Default max delay in case of traffic shaping
55       * (during which no communication will occur).
56       * Shall be less than TIMEOUT. Here half of "standard" 30s
57       */
58      public static final long DEFAULT_MAX_TIME = 15000;
59  
60      /**
61       * Default max size to not exceed in buffer (write only).
62       */
63      static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
64  
65      /**
66       * Default minimal time to wait
67       */
68      static final long MINIMAL_WAIT = 10;
69  
70      /**
71       * Traffic Counter
72       */
73      protected TrafficCounter trafficCounter;
74  
75      /**
76       * Limit in B/s to apply to write
77       */
78      private volatile long writeLimit;
79  
80      /**
81       * Limit in B/s to apply to read
82       */
83      private volatile long readLimit;
84  
85      /**
86       * Max delay in wait
87       */
88      protected volatile long maxTime = DEFAULT_MAX_TIME; // default 15 s
89  
90      /**
91       * Delay between two performance snapshots
92       */
93      protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
94  
95      static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey
96              .valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED");
97      static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class
98              .getName() + ".REOPEN_TASK");
99  
100     /**
101      * Max time to delay before proposing to stop writing new objects from next handlers
102      */
103     volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL; // default 4 s
104     /**
105      * Max size in the list before proposing to stop writing new objects from next handlers
106      */
107     volatile long maxWriteSize = DEFAULT_MAX_SIZE; // default 4MB
108 
109     /**
110      * Rank in UserDefinedWritability (1 for Channel, 2 for Global TrafficShapingHandler).
111      * Set in final constructor. Must be between 1 and 31
112      */
113     final int userDefinedWritabilityIndex;
114 
115     /**
116      * Default value for Channel UserDefinedWritability index
117      */
118     static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
119 
120     /**
121      * Default value for Global UserDefinedWritability index
122      */
123     static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
124 
125     /**
126      * Default value for GlobalChannel UserDefinedWritability index
127      */
128     static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
129 
130     /**
131      * @param newTrafficCounter
132      *            the TrafficCounter to set
133      */
134     void setTrafficCounter(TrafficCounter newTrafficCounter) {
135         trafficCounter = newTrafficCounter;
136     }
137 
138     /**
139      * @return the index to be used by the TrafficShapingHandler to manage the user defined writability.
140      *              For Channel TSH it is defined as {@value #CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
141      *              for Global TSH it is defined as {@value #GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
142      *              for GlobalChannel TSH it is defined as
143      *              {@value #GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}.
144      */
145     protected int userDefinedWritabilityIndex() {
146         return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
147     }
148 
149     /**
150      * @param writeLimit
151      *            0 or a limit in bytes/s
152      * @param readLimit
153      *            0 or a limit in bytes/s
154      * @param checkInterval
155      *            The delay between two computations of performances for
156      *            channels or 0 if no stats are to be computed.
157      * @param maxTime
158      *            The maximum delay to wait in case of traffic excess.
159      *            Must be positive.
160      */
161     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
162         if (maxTime <= 0) {
163             throw new IllegalArgumentException("maxTime must be positive");
164         }
165 
166         userDefinedWritabilityIndex = userDefinedWritabilityIndex();
167         this.writeLimit = writeLimit;
168         this.readLimit = readLimit;
169         this.checkInterval = checkInterval;
170         this.maxTime = maxTime;
171     }
172 
173     /**
174      * Constructor using default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
175      * @param writeLimit
176      *            0 or a limit in bytes/s
177      * @param readLimit
178      *            0 or a limit in bytes/s
179      * @param checkInterval
180      *            The delay between two computations of performances for
181      *            channels or 0 if no stats are to be computed.
182      */
183     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) {
184         this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
185     }
186 
187     /**
188      * Constructor using default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
189      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
190      *
191      * @param writeLimit
192      *            0 or a limit in bytes/s
193      * @param readLimit
194      *            0 or a limit in bytes/s
195      */
196     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) {
197         this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
198     }
199 
200     /**
201      * Constructor using NO LIMIT, default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
202      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
203      */
204     protected AbstractTrafficShapingHandler() {
205         this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
206     }
207 
208     /**
209      * Constructor using NO LIMIT and
210      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
211      *
212      * @param checkInterval
213      *            The delay between two computations of performances for
214      *            channels or 0 if no stats are to be computed.
215      */
216     protected AbstractTrafficShapingHandler(long checkInterval) {
217         this(0, 0, checkInterval, DEFAULT_MAX_TIME);
218     }
219 
220     /**
221      * Change the underlying limitations and check interval.
222      * <p>Note the change will be taken as best effort, meaning
223      * that all already scheduled traffics will not be
224      * changed, but only applied to new traffics.</p>
225      * <p>So the expected usage of this method is to be used not too often,
226      * accordingly to the traffic shaping configuration.</p>
227      *
228      * @param newWriteLimit
229      *            The new write limit (in bytes)
230      * @param newReadLimit
231      *            The new read limit (in bytes)
232      * @param newCheckInterval
233      *            The new check interval (in milliseconds)
234      */
235     public void configure(long newWriteLimit, long newReadLimit, long newCheckInterval) {
236         configure(newWriteLimit, newReadLimit);
237         configure(newCheckInterval);
238     }
239 
240     /**
241      * Change the underlying limitations.
242      * <p>Note the change will be taken as best effort, meaning
243      * that all already scheduled traffics will not be
244      * changed, but only applied to new traffics.</p>
245      * <p>So the expected usage of this method is to be used not too often,
246      * accordingly to the traffic shaping configuration.</p>
247      *
248      * @param newWriteLimit
249      *            The new write limit (in bytes)
250      * @param newReadLimit
251      *            The new read limit (in bytes)
252      */
253     public void configure(long newWriteLimit, long newReadLimit) {
254         writeLimit = newWriteLimit;
255         readLimit = newReadLimit;
256         if (trafficCounter != null) {
257             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
258         }
259     }
260 
261     /**
262      * Change the check interval.
263      *
264      * @param newCheckInterval
265      *            The new check interval (in milliseconds)
266      */
267     public void configure(long newCheckInterval) {
268         checkInterval = newCheckInterval;
269         if (trafficCounter != null) {
270             trafficCounter.configure(checkInterval);
271         }
272     }
273 
274     /**
275      * @return the writeLimit
276      */
277     public long getWriteLimit() {
278         return writeLimit;
279     }
280 
281     /**
282      * <p>Note the change will be taken as best effort, meaning
283      * that all already scheduled traffics will not be
284      * changed, but only applied to new traffics.</p>
285      * <p>So the expected usage of this method is to be used not too often,
286      * accordingly to the traffic shaping configuration.</p>
287      *
288      * @param writeLimit the writeLimit to set
289      */
290     public void setWriteLimit(long writeLimit) {
291         this.writeLimit = writeLimit;
292         if (trafficCounter != null) {
293             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
294         }
295     }
296 
297     /**
298      * @return the readLimit
299      */
300     public long getReadLimit() {
301         return readLimit;
302     }
303 
304     /**
305      * <p>Note the change will be taken as best effort, meaning
306      * that all already scheduled traffics will not be
307      * changed, but only applied to new traffics.</p>
308      * <p>So the expected usage of this method is to be used not too often,
309      * accordingly to the traffic shaping configuration.</p>
310      *
311      * @param readLimit the readLimit to set
312      */
313     public void setReadLimit(long readLimit) {
314         this.readLimit = readLimit;
315         if (trafficCounter != null) {
316             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
317         }
318     }
319 
320     /**
321      * @return the checkInterval
322      */
323     public long getCheckInterval() {
324         return checkInterval;
325     }
326 
327     /**
328      * @param checkInterval the interval in ms between each step check to set, default value being 1000 ms.
329      */
330     public void setCheckInterval(long checkInterval) {
331         this.checkInterval = checkInterval;
332         if (trafficCounter != null) {
333             trafficCounter.configure(checkInterval);
334         }
335     }
336 
337     /**
338      * <p>Note the change will be taken as best effort, meaning
339      * that all already scheduled traffics will not be
340      * changed, but only applied to new traffics.</p>
341      * <p>So the expected usage of this method is to be used not too often,
342      * accordingly to the traffic shaping configuration.</p>
343      *
344      * @param maxTime
345      *            Max delay in wait, shall be less than TIME OUT in related protocol.
346      *            Must be positive.
347      */
348     public void setMaxTimeWait(long maxTime) {
349         if (maxTime <= 0) {
350             throw new IllegalArgumentException("maxTime must be positive");
351         }
352         this.maxTime = maxTime;
353     }
354 
355     /**
356      * @return the max delay in wait to prevent TIME OUT
357      */
358     public long getMaxTimeWait() {
359         return maxTime;
360     }
361 
362     /**
363      * @return the maxWriteDelay
364      */
365     public long getMaxWriteDelay() {
366         return maxWriteDelay;
367     }
368 
369     /**
370      * <p>Note the change will be taken as best effort, meaning
371      * that all already scheduled traffics will not be
372      * changed, but only applied to new traffics.</p>
373      * <p>So the expected usage of this method is to be used not too often,
374      * accordingly to the traffic shaping configuration.</p>
375      *
376      * @param maxWriteDelay the maximum Write Delay in ms in the buffer allowed before write suspension is set.
377      *              Must be positive.
378      */
379     public void setMaxWriteDelay(long maxWriteDelay) {
380         if (maxWriteDelay <= 0) {
381             throw new IllegalArgumentException("maxWriteDelay must be positive");
382         }
383         this.maxWriteDelay = maxWriteDelay;
384     }
385 
386     /**
387      * @return the maxWriteSize default being {@value #DEFAULT_MAX_SIZE} bytes.
388      */
389     public long getMaxWriteSize() {
390         return maxWriteSize;
391     }
392 
393     /**
394      * <p>Note that this limit is a best effort on memory limitation to prevent Out Of
395      * Memory Exception. To ensure it works, the handler generating the write should
396      * use one of the way provided by Netty to handle the capacity:</p>
397      * <p>- the {@code Channel.isWritable()} property and the corresponding
398      * {@code channelWritabilityChanged()}</p>
399      * <p>- the {@code ChannelFuture.addListener(new GenericFutureListener())}</p>
400      *
401      * @param maxWriteSize the maximum Write Size allowed in the buffer
402      *            per channel before write suspended is set,
403      *            default being {@value #DEFAULT_MAX_SIZE} bytes.
404      */
405     public void setMaxWriteSize(long maxWriteSize) {
406         this.maxWriteSize = maxWriteSize;
407     }
408 
409     /**
410      * Called each time the accounting is computed from the TrafficCounters.
411      * This method could be used for instance to implement almost real time accounting.
412      *
413      * @param counter
414      *            the TrafficCounter that computes its performance
415      */
416     protected void doAccounting(TrafficCounter counter) {
417         // NOOP by default
418     }
419 
420     /**
421      * Class to implement setReadable at fix time
422      */
423     static final class ReopenReadTimerTask implements Runnable {
424         final ChannelHandlerContext ctx;
425 
426         ReopenReadTimerTask(ChannelHandlerContext ctx) {
427             this.ctx = ctx;
428         }
429 
430         @Override
431         public void run() {
432             ChannelConfig config = ctx.channel().config();
433             if (!config.isAutoRead() && isHandlerActive(ctx)) {
434                 // If AutoRead is False and Active is True, user make a direct setAutoRead(false)
435                 // Then Just reset the status
436                 if (logger.isDebugEnabled()) {
437                     logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
438                             isHandlerActive(ctx));
439                 }
440                 ctx.attr(READ_SUSPENDED).set(false);
441             } else {
442                 // Anything else allows the handler to reset the AutoRead
443                 if (logger.isDebugEnabled()) {
444                     if (config.isAutoRead() && !isHandlerActive(ctx)) {
445                         logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
446                                 isHandlerActive(ctx));
447                     } else {
448                         logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
449                                 + isHandlerActive(ctx));
450                     }
451                 }
452                 ctx.attr(READ_SUSPENDED).set(false);
453                 config.setAutoRead(true);
454                 ctx.channel().read();
455             }
456             if (logger.isDebugEnabled()) {
457                 logger.debug("Unsuspend final status => " + config.isAutoRead() + ':'
458                         + isHandlerActive(ctx));
459             }
460         }
461     }
462 
463     /**
464      * Release the Read suspension
465      */
466     void releaseReadSuspended(ChannelHandlerContext ctx) {
467         ctx.attr(READ_SUSPENDED).set(false);
468         ctx.channel().config().setAutoRead(true);
469     }
470 
471     @Override
472     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
473         long size = calculateSize(msg);
474         long now = TrafficCounter.milliSecondFromNano();
475         if (size > 0) {
476             // compute the number of ms to wait before reopening the channel
477             long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
478             wait = checkWaitReadTime(ctx, wait, now);
479             if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
480                 // time in order to try to limit the traffic
481                 // Only AutoRead AND HandlerActive True means Context Active
482                 ChannelConfig config = ctx.channel().config();
483                 if (logger.isDebugEnabled()) {
484                     logger.debug("Read suspend: " + wait + ':' + config.isAutoRead() + ':'
485                             + isHandlerActive(ctx));
486                 }
487                 if (config.isAutoRead() && isHandlerActive(ctx)) {
488                     config.setAutoRead(false);
489                     ctx.attr(READ_SUSPENDED).set(true);
490                     // Create a Runnable to reactive the read if needed. If one was create before it will just be
491                     // reused to limit object creation
492                     Attribute<Runnable> attr = ctx.attr(REOPEN_TASK);
493                     Runnable reopenTask = attr.get();
494                     if (reopenTask == null) {
495                         reopenTask = new ReopenReadTimerTask(ctx);
496                         attr.set(reopenTask);
497                     }
498                     ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
499                     if (logger.isDebugEnabled()) {
500                         logger.debug("Suspend final status => " + config.isAutoRead() + ':'
501                                 + isHandlerActive(ctx) + " will reopened at: " + wait);
502                     }
503                 }
504             }
505         }
506         informReadOperation(ctx, now);
507         ctx.fireChannelRead(msg);
508     }
509 
510     /**
511      * Method overridden in GTSH to take into account specific timer for the channel.
512      * @param wait the wait delay computed in ms
513      * @param now the relative now time in ms
514      * @return the wait to use according to the context
515      */
516     long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
517         // no change by default
518         return wait;
519     }
520 
521     /**
522      * Method overridden in GTSH to take into account specific timer for the channel.
523      * @param now the relative now time in ms
524      */
525     void informReadOperation(final ChannelHandlerContext ctx, final long now) {
526         // default noop
527     }
528 
529     protected static boolean isHandlerActive(ChannelHandlerContext ctx) {
530         Boolean suspended = ctx.attr(READ_SUSPENDED).get();
531         return suspended == null || Boolean.FALSE.equals(suspended);
532     }
533 
534     @Override
535     public void read(ChannelHandlerContext ctx) {
536         if (isHandlerActive(ctx)) {
537             // For Global Traffic (and Read when using EventLoop in pipeline) : check if READ_SUSPENDED is False
538             ctx.read();
539         }
540     }
541 
542     @Override
543     public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
544             throws Exception {
545         long size = calculateSize(msg);
546         long now = TrafficCounter.milliSecondFromNano();
547         if (size > 0) {
548             // compute the number of ms to wait before continue with the channel
549             long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
550             if (wait >= MINIMAL_WAIT) {
551                 if (logger.isDebugEnabled()) {
552                     logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
553                             + isHandlerActive(ctx));
554                 }
555                 submitWrite(ctx, msg, size, wait, now, promise);
556                 return;
557             }
558         }
559         // to maintain order of write
560         submitWrite(ctx, msg, size, 0, now, promise);
561     }
562 
563     @Deprecated
564     protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
565             final long delay, final ChannelPromise promise) {
566         submitWrite(ctx, msg, calculateSize(msg),
567                 delay, TrafficCounter.milliSecondFromNano(), promise);
568     }
569 
570     abstract void submitWrite(
571             ChannelHandlerContext ctx, Object msg, long size, long delay, long now, ChannelPromise promise);
572 
573     @Override
574     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
575         setUserDefinedWritability(ctx, true);
576         super.channelRegistered(ctx);
577     }
578 
579     void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
580         ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
581         if (cob != null) {
582             cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable);
583         }
584     }
585 
586     /**
587      * Check the writability according to delay and size for the channel.
588      * Set if necessary setUserDefinedWritability status.
589      * @param delay the computed delay
590      * @param queueSize the current queueSize
591      */
592     void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
593         if (queueSize > maxWriteSize || delay > maxWriteDelay) {
594             setUserDefinedWritability(ctx, false);
595         }
596     }
597     /**
598      * Explicitly release the Write suspended status.
599      */
600     void releaseWriteSuspended(ChannelHandlerContext ctx) {
601         setUserDefinedWritability(ctx, true);
602     }
603 
604     /**
605      * @return the current TrafficCounter (if
606      *         channel is still connected)
607      */
608     public TrafficCounter trafficCounter() {
609         return trafficCounter;
610     }
611 
612     @Override
613     public String toString() {
614         StringBuilder builder = new StringBuilder(290)
615             .append("TrafficShaping with Write Limit: ").append(writeLimit)
616             .append(" Read Limit: ").append(readLimit)
617             .append(" CheckInterval: ").append(checkInterval)
618             .append(" maxDelay: ").append(maxWriteDelay)
619             .append(" maxSize: ").append(maxWriteSize)
620             .append(" and Counter: ");
621         if (trafficCounter != null) {
622             builder.append(trafficCounter);
623         } else {
624             builder.append("none");
625         }
626         return builder.toString();
627     }
628 
629     /**
630      * Calculate the size of the given {@link Object}.
631      *
632      * This implementation supports {@link ByteBuf} and {@link ByteBufHolder}. Sub-classes may override this.
633      *
634      * @param msg
635      *            the msg for which the size should be calculated.
636      * @return size the size of the msg or {@code -1} if unknown.
637      */
638     protected long calculateSize(Object msg) {
639         if (msg instanceof ByteBuf) {
640             return ((ByteBuf) msg).readableBytes();
641         }
642         if (msg instanceof ByteBufHolder) {
643             return ((ByteBufHolder) msg).content().readableBytes();
644         }
645         return -1;
646     }
647 }