View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.traffic;
17  
18  import io.netty.util.internal.logging.InternalLogger;
19  import io.netty.util.internal.logging.InternalLoggerFactory;
20  
21  import java.util.concurrent.ScheduledExecutorService;
22  import java.util.concurrent.ScheduledFuture;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.atomic.AtomicLong;
25  
26  
27  /**
28   * Counts the number of read and written bytes for rate-limiting traffic.
29   * <p>
30   * It computes the statistics for both inbound and outbound traffic periodically at the given
31   * {@code checkInterval}, and calls the {@link AbstractTrafficShapingHandler#doAccounting(TrafficCounter)} method back.
32   * If the {@code checkInterval} is {@code 0}, no accounting will be done and statistics will only be computed at each
33   * receive or write operation.
34   * </p>
35   */
36  public class TrafficCounter {
37  
38      private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
39  
40      /**
41       * @return the time in ms using nanoTime, so not real EPOCH time but elapsed time in ms.
42       */
43      public static long milliSecondFromNano() {
44          return System.nanoTime() / 1000000;
45      }
46  
47      /**
48       * Current written bytes
49       */
50      private final AtomicLong currentWrittenBytes = new AtomicLong();
51  
52      /**
53       * Current read bytes
54       */
55      private final AtomicLong currentReadBytes = new AtomicLong();
56  
57      /**
58       * Last writing time during current check interval
59       */
60      private long writingTime;
61  
62      /**
63       * Last reading delay during current check interval
64       */
65      private long readingTime;
66  
67      /**
68       * Long life written bytes
69       */
70      private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
71  
72      /**
73       * Long life read bytes
74       */
75      private final AtomicLong cumulativeReadBytes = new AtomicLong();
76  
77      /**
78       * Last Time where cumulative bytes where reset to zero: this time is a real EPOC time (informative only)
79       */
80      private long lastCumulativeTime;
81  
82      /**
83       * Last writing bandwidth
84       */
85      private long lastWriteThroughput;
86  
87      /**
88       * Last reading bandwidth
89       */
90      private long lastReadThroughput;
91  
92      /**
93       * Last Time Check taken
94       */
95      final AtomicLong lastTime = new AtomicLong();
96  
97      /**
98       * Last written bytes number during last check interval
99       */
100     private volatile long lastWrittenBytes;
101 
102     /**
103      * Last read bytes number during last check interval
104      */
105     private volatile long lastReadBytes;
106 
107     /**
108      * Last future writing time during last check interval
109      */
110     private volatile long lastWritingTime;
111 
112     /**
113      * Last reading time during last check interval
114      */
115     private volatile long lastReadingTime;
116 
117     /**
118      * Real written bytes
119      */
120     private final AtomicLong realWrittenBytes = new AtomicLong();
121 
122     /**
123      * Real writing bandwidth
124      */
125     private long realWriteThroughput;
126 
127     /**
128      * Delay between two captures
129      */
130     final AtomicLong checkInterval = new AtomicLong(
131             AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
132 
133     // default 1 s
134 
135     /**
136      * Name of this Monitor
137      */
138     final String name;
139 
140     /**
141      * The associated TrafficShapingHandler
142      */
143     final AbstractTrafficShapingHandler trafficShapingHandler;
144 
145     /**
146      * Executor that will run the monitor
147      */
148     final ScheduledExecutorService executor;
149     /**
150      * Monitor created once in start()
151      */
152     Runnable monitor;
153     /**
154      * used in stop() to cancel the timer
155      */
156     volatile ScheduledFuture<?> scheduledFuture;
157 
158     /**
159      * Is Monitor active
160      */
161     volatile boolean monitorActive;
162 
163     /**
164      * Class to implement monitoring at fix delay
165      *
166      */
167     private final class TrafficMonitoringTask implements Runnable {
168         @Override
169         public void run() {
170             if (!monitorActive) {
171                 return;
172             }
173             resetAccounting(milliSecondFromNano());
174             if (trafficShapingHandler != null) {
175                 trafficShapingHandler.doAccounting(TrafficCounter.this);
176             }
177             scheduledFuture = executor.schedule(this, checkInterval.get(), TimeUnit.MILLISECONDS);
178         }
179     }
180 
181     /**
182      * Start the monitoring process.
183      */
184     public synchronized void start() {
185         if (monitorActive) {
186             return;
187         }
188         lastTime.set(milliSecondFromNano());
189         long localCheckInterval = checkInterval.get();
190         // if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor
191         if (localCheckInterval > 0 && executor != null) {
192             monitorActive = true;
193             monitor = new TrafficMonitoringTask();
194             scheduledFuture =
195                 executor.schedule(monitor, localCheckInterval, TimeUnit.MILLISECONDS);
196         }
197     }
198 
199     /**
200      * Stop the monitoring process.
201      */
202     public synchronized void stop() {
203         if (!monitorActive) {
204             return;
205         }
206         monitorActive = false;
207         resetAccounting(milliSecondFromNano());
208         if (trafficShapingHandler != null) {
209             trafficShapingHandler.doAccounting(this);
210         }
211         if (scheduledFuture != null) {
212             scheduledFuture.cancel(true);
213         }
214     }
215 
216     /**
217      * Reset the accounting on Read and Write.
218      *
219      * @param newLastTime the milliseconds unix timestamp that we should be considered up-to-date for.
220      */
221     synchronized void resetAccounting(long newLastTime) {
222         long interval = newLastTime - lastTime.getAndSet(newLastTime);
223         if (interval == 0) {
224             // nothing to do
225             return;
226         }
227         if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
228             logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
229         }
230         lastReadBytes = currentReadBytes.getAndSet(0);
231         lastWrittenBytes = currentWrittenBytes.getAndSet(0);
232         lastReadThroughput = lastReadBytes * 1000 / interval;
233         // nb byte / checkInterval in ms * 1000 (1s)
234         lastWriteThroughput = lastWrittenBytes * 1000 / interval;
235         // nb byte / checkInterval in ms * 1000 (1s)
236         realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
237         lastWritingTime = Math.max(lastWritingTime, writingTime);
238         lastReadingTime = Math.max(lastReadingTime, readingTime);
239     }
240 
241     /**
242      * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the {@link ScheduledExecutorService}
243      * to use, its name, the checkInterval between two computations in milliseconds.
244      *
245      * @param executor
246      *            the underlying executor service for scheduling checks, might be null when used
247      * from {@link GlobalChannelTrafficCounter}.
248      * @param name
249      *            the name given to this monitor.
250      * @param checkInterval
251      *            the checkInterval in millisecond between two computations.
252      */
253     public TrafficCounter(ScheduledExecutorService executor, String name, long checkInterval) {
254         if (name == null) {
255             throw new NullPointerException("name");
256         }
257 
258         trafficShapingHandler = null;
259         this.executor = executor;
260         this.name = name;
261 
262         init(checkInterval);
263     }
264 
265     /**
266      * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
267      * name, the checkInterval between two computations in millisecond.
268      *
269      * @param trafficShapingHandler
270      *            the associated AbstractTrafficShapingHandler.
271      * @param executor
272      *            the underlying executor service for scheduling checks, might be null when used
273      * from {@link GlobalChannelTrafficCounter}.
274      * @param name
275      *            the name given to this monitor.
276      * @param checkInterval
277      *            the checkInterval in millisecond between two computations.
278      */
279     public TrafficCounter(
280             AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor,
281             String name, long checkInterval) {
282 
283         if (trafficShapingHandler == null) {
284             throw new IllegalArgumentException("trafficShapingHandler");
285         }
286         if (name == null) {
287             throw new NullPointerException("name");
288         }
289 
290         this.trafficShapingHandler = trafficShapingHandler;
291         this.executor = executor;
292         this.name = name;
293 
294         init(checkInterval);
295     }
296 
297     private void init(long checkInterval) {
298         // absolute time: informative only
299         lastCumulativeTime = System.currentTimeMillis();
300         writingTime = milliSecondFromNano();
301         readingTime = writingTime;
302         lastWritingTime = writingTime;
303         lastReadingTime = writingTime;
304         configure(checkInterval);
305     }
306 
307     /**
308      * Change checkInterval between two computations in millisecond.
309      *
310      * @param newCheckInterval The new check interval (in milliseconds)
311      */
312     public void configure(long newCheckInterval) {
313         long newInterval = newCheckInterval / 10 * 10;
314         if (checkInterval.getAndSet(newInterval) != newInterval) {
315             if (newInterval <= 0) {
316                 stop();
317                 // No more active monitoring
318                 lastTime.set(milliSecondFromNano());
319             } else {
320                 // Start if necessary
321                 start();
322             }
323         }
324     }
325 
326     /**
327      * Computes counters for Read.
328      *
329      * @param recv
330      *            the size in bytes to read
331      */
332     void bytesRecvFlowControl(long recv) {
333         currentReadBytes.addAndGet(recv);
334         cumulativeReadBytes.addAndGet(recv);
335     }
336 
337     /**
338      * Computes counters for Write.
339      *
340      * @param write
341      *            the size in bytes to write
342      */
343     void bytesWriteFlowControl(long write) {
344         currentWrittenBytes.addAndGet(write);
345         cumulativeWrittenBytes.addAndGet(write);
346     }
347 
348     /**
349      * Computes counters for Real Write.
350      *
351      * @param write
352      *            the size in bytes to write
353      */
354     void bytesRealWriteFlowControl(long write) {
355         realWrittenBytes.addAndGet(write);
356     }
357 
358     /**
359      * @return the current checkInterval between two computations of traffic counter
360      *         in millisecond.
361      */
362     public long checkInterval() {
363         return checkInterval.get();
364     }
365 
366     /**
367      * @return the Read Throughput in bytes/s computes in the last check interval.
368      */
369     public long lastReadThroughput() {
370         return lastReadThroughput;
371     }
372 
373     /**
374      * @return the Write Throughput in bytes/s computes in the last check interval.
375      */
376     public long lastWriteThroughput() {
377         return lastWriteThroughput;
378     }
379 
380     /**
381      * @return the number of bytes read during the last check Interval.
382      */
383     public long lastReadBytes() {
384         return lastReadBytes;
385     }
386 
387     /**
388      * @return the number of bytes written during the last check Interval.
389      */
390     public long lastWrittenBytes() {
391         return lastWrittenBytes;
392     }
393 
394     /**
395      * @return the current number of bytes read since the last checkInterval.
396      */
397     public long currentReadBytes() {
398         return currentReadBytes.get();
399     }
400 
401     /**
402      * @return the current number of bytes written since the last check Interval.
403      */
404     public long currentWrittenBytes() {
405         return currentWrittenBytes.get();
406     }
407 
408     /**
409      * @return the Time in millisecond of the last check as of System.currentTimeMillis().
410      */
411     public long lastTime() {
412         return lastTime.get();
413     }
414 
415     /**
416      * @return the cumulativeWrittenBytes
417      */
418     public long cumulativeWrittenBytes() {
419         return cumulativeWrittenBytes.get();
420     }
421 
422     /**
423      * @return the cumulativeReadBytes
424      */
425     public long cumulativeReadBytes() {
426         return cumulativeReadBytes.get();
427     }
428 
429     /**
430      * @return the lastCumulativeTime in millisecond as of System.currentTimeMillis()
431      * when the cumulative counters were reset to 0.
432      */
433     public long lastCumulativeTime() {
434         return lastCumulativeTime;
435     }
436 
437     /**
438      * @return the realWrittenBytes
439      */
440     public AtomicLong getRealWrittenBytes() {
441         return realWrittenBytes;
442     }
443 
444     /**
445      * @return the realWriteThroughput
446      */
447     public long getRealWriteThroughput() {
448         return realWriteThroughput;
449     }
450 
451     /**
452      * Reset both read and written cumulative bytes counters and the associated absolute time
453      * from System.currentTimeMillis().
454      */
455     public void resetCumulativeTime() {
456         lastCumulativeTime = System.currentTimeMillis();
457         cumulativeReadBytes.set(0);
458         cumulativeWrittenBytes.set(0);
459     }
460 
461     /**
462      * @return the name of this TrafficCounter.
463      */
464     public String name() {
465         return name;
466     }
467 
468     /**
469      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
470      * time.
471      *
472      * @param size
473      *            the recv size
474      * @param limitTraffic
475      *            the traffic limit in bytes per second.
476      * @param maxTime
477      *            the max time in ms to wait in case of excess of traffic.
478      * @return the current time to wait (in ms) if needed for Read operation.
479      */
480     @Deprecated
481     public long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
482         return readTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
483     }
484 
485     /**
486      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
487      * time.
488      *
489      * @param size
490      *            the recv size
491      * @param limitTraffic
492      *            the traffic limit in bytes per second
493      * @param maxTime
494      *            the max time in ms to wait in case of excess of traffic.
495      * @param now the current time
496      * @return the current time to wait (in ms) if needed for Read operation.
497      */
498     public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
499         bytesRecvFlowControl(size);
500         if (size == 0 || limitTraffic == 0) {
501             return 0;
502         }
503         final long lastTimeCheck = lastTime.get();
504         long sum = currentReadBytes.get();
505         long localReadingTime = readingTime;
506         long lastRB = lastReadBytes;
507         final long interval = now - lastTimeCheck;
508         long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
509         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
510             // Enough interval time to compute shaping
511             long time = sum * 1000 / limitTraffic - interval + pastDelay;
512             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
513                 if (logger.isDebugEnabled()) {
514                     logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
515                 }
516                 if (time > maxTime && now + time - localReadingTime > maxTime) {
517                     time = maxTime;
518                 }
519                 readingTime = Math.max(localReadingTime, now + time);
520                 return time;
521             }
522             readingTime = Math.max(localReadingTime, now);
523             return 0;
524         }
525         // take the last read interval check to get enough interval time
526         long lastsum = sum + lastRB;
527         long lastinterval = interval + checkInterval.get();
528         long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
529         if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
530             if (logger.isDebugEnabled()) {
531                 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
532             }
533             if (time > maxTime && now + time - localReadingTime > maxTime) {
534                 time = maxTime;
535             }
536             readingTime = Math.max(localReadingTime, now + time);
537             return time;
538         }
539         readingTime = Math.max(localReadingTime, now);
540         return 0;
541     }
542 
543     /**
544      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
545      * the max wait time.
546      *
547      * @param size
548      *            the write size
549      * @param limitTraffic
550      *            the traffic limit in bytes per second.
551      * @param maxTime
552      *            the max time in ms to wait in case of excess of traffic.
553      * @return the current time to wait (in ms) if needed for Write operation.
554      */
555     @Deprecated
556     public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
557         return writeTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
558     }
559 
560     /**
561      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
562      * the max wait time.
563      *
564      * @param size
565      *            the write size
566      * @param limitTraffic
567      *            the traffic limit in bytes per second.
568      * @param maxTime
569      *            the max time in ms to wait in case of excess of traffic.
570      * @param now the current time
571      * @return the current time to wait (in ms) if needed for Write operation.
572      */
573     public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
574         bytesWriteFlowControl(size);
575         if (size == 0 || limitTraffic == 0) {
576             return 0;
577         }
578         final long lastTimeCheck = lastTime.get();
579         long sum = currentWrittenBytes.get();
580         long lastWB = lastWrittenBytes;
581         long localWritingTime = writingTime;
582         long pastDelay = Math.max(lastWritingTime - lastTimeCheck, 0);
583         final long interval = now - lastTimeCheck;
584         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
585             // Enough interval time to compute shaping
586             long time = sum * 1000 / limitTraffic - interval + pastDelay;
587             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
588                 if (logger.isDebugEnabled()) {
589                     logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
590                 }
591                 if (time > maxTime && now + time - localWritingTime > maxTime) {
592                     time = maxTime;
593                 }
594                 writingTime = Math.max(localWritingTime, now + time);
595                 return time;
596             }
597             writingTime = Math.max(localWritingTime, now);
598             return 0;
599         }
600         // take the last write interval check to get enough interval time
601         long lastsum = sum + lastWB;
602         long lastinterval = interval + checkInterval.get();
603         long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
604         if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
605             if (logger.isDebugEnabled()) {
606                 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
607             }
608             if (time > maxTime && now + time - localWritingTime > maxTime) {
609                 time = maxTime;
610             }
611             writingTime = Math.max(localWritingTime, now + time);
612             return time;
613         }
614         writingTime = Math.max(localWritingTime, now);
615         return 0;
616     }
617 
618     @Override
619     public String toString() {
620         return new StringBuilder(165).append("Monitor ").append(name)
621                 .append(" Current Speed Read: ").append(lastReadThroughput >> 10).append(" KB/s, ")
622                 .append("Asked Write: ").append(lastWriteThroughput >> 10).append(" KB/s, ")
623                 .append("Real Write: ").append(realWriteThroughput >> 10).append(" KB/s, ")
624                 .append("Current Read: ").append(currentReadBytes.get() >> 10).append(" KB, ")
625                 .append("Current asked Write: ").append(currentWrittenBytes.get() >> 10).append(" KB, ")
626                 .append("Current real Write: ").append(realWrittenBytes.get() >> 10).append(" KB").toString();
627     }
628 }