View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    * The Netty Project licenses this file to you under the Apache License,
4    * version 2.0 (the "License"); you may not use this file except in compliance
5    * with the License. You may obtain a copy of the License at:
6    * http://www.apache.org/licenses/LICENSE-2.0
7    * Unless required by applicable law or agreed to in writing, software
8    * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9    * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10   * License for the specific language governing permissions and limitations
11   * under the License.
12   */
13  package io.netty.handler.traffic;
14  
15  import io.netty.util.internal.logging.InternalLogger;
16  import io.netty.util.internal.logging.InternalLoggerFactory;
17  
18  import java.util.concurrent.ScheduledExecutorService;
19  import java.util.concurrent.ScheduledFuture;
20  import java.util.concurrent.TimeUnit;
21  import java.util.concurrent.atomic.AtomicLong;
22  
23  /**
24   * Counts the number of read and written bytes for rate-limiting traffic.
25   * <p>
26   * It computes the statistics for both inbound and outbound traffic periodically at the given
27   * {@code checkInterval}, and calls the {@link AbstractTrafficShapingHandler#doAccounting(TrafficCounter)} method back.
28   * If the {@code checkInterval} is {@code 0}, no accounting will be done and statistics will only be computed at each
29   * receive or write operation.
30   * </p>
31   */
32  public class TrafficCounter {
33  
34      private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
35  
36      /**
37       * @return the time in ms using nanoTime, so not real EPOCH time but elapsed time in ms.
38       */
39      public static long milliSecondFromNano() {
40          return System.nanoTime() / 1000000;
41      }
42  
43      /**
44       * Current written bytes
45       */
46      private final AtomicLong currentWrittenBytes = new AtomicLong();
47  
48      /**
49       * Current read bytes
50       */
51      private final AtomicLong currentReadBytes = new AtomicLong();
52  
53      /**
54       * Last writing time during current check interval
55       */
56      private long writingTime;
57  
58      /**
59       * Last reading delay during current check interval
60       */
61      private long readingTime;
62  
63      /**
64       * Long life written bytes
65       */
66      private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
67  
68      /**
69       * Long life read bytes
70       */
71      private final AtomicLong cumulativeReadBytes = new AtomicLong();
72  
73      /**
74       * Last Time where cumulative bytes where reset to zero: this time is a real EPOC time (informative only)
75       */
76      private long lastCumulativeTime;
77  
78      /**
79       * Last writing bandwidth
80       */
81      private long lastWriteThroughput;
82  
83      /**
84       * Last reading bandwidth
85       */
86      private long lastReadThroughput;
87  
88      /**
89       * Last Time Check taken
90       */
91      final AtomicLong lastTime = new AtomicLong();
92  
93      /**
94       * Last written bytes number during last check interval
95       */
96      private volatile long lastWrittenBytes;
97  
98      /**
99       * Last read bytes number during last check interval
100      */
101     private volatile long lastReadBytes;
102 
103     /**
104      * Last future writing time during last check interval
105      */
106     private volatile long lastWritingTime;
107 
108     /**
109      * Last reading time during last check interval
110      */
111     private volatile long lastReadingTime;
112 
113     /**
114      * Real written bytes
115      */
116     private final AtomicLong realWrittenBytes = new AtomicLong();
117 
118     /**
119      * Real writing bandwidth
120      */
121     private long realWriteThroughput;
122 
123     /**
124      * Delay between two captures
125      */
126     final AtomicLong checkInterval = new AtomicLong(AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
127 
128     // default 1 s
129 
130     /**
131      * Name of this Monitor
132      */
133     final String name;
134 
135     /**
136      * The associated TrafficShapingHandler
137      */
138     final AbstractTrafficShapingHandler trafficShapingHandler;
139 
140     /**
141      * Executor that will run the monitor
142      */
143     final ScheduledExecutorService executor;
144     /**
145      * Monitor created once in start()
146      */
147     Runnable monitor;
148     /**
149      * used in stop() to cancel the timer
150      */
151     volatile ScheduledFuture<?> scheduledFuture;
152 
153     /**
154      * Is Monitor active
155      */
156     volatile boolean monitorActive;
157 
158     /**
159      * Class to implement monitoring at fix delay
160      *
161      */
162     private final class TrafficMonitoringTask implements Runnable {
163         @Override
164         public void run() {
165             if (!monitorActive) {
166                 return;
167             }
168             resetAccounting(milliSecondFromNano());
169             if (trafficShapingHandler != null) {
170                 trafficShapingHandler.doAccounting(TrafficCounter.this);
171             }
172             scheduledFuture = executor.schedule(this, checkInterval.get(), TimeUnit.MILLISECONDS);
173         }
174     }
175 
176     /**
177      * Start the monitoring process.
178      */
179     public synchronized void start() {
180         if (monitorActive) {
181             return;
182         }
183         lastTime.set(milliSecondFromNano());
184         long localCheckInterval = checkInterval.get();
185         // if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor
186         if (localCheckInterval > 0 && executor != null) {
187             monitorActive = true;
188             monitor = new TrafficMonitoringTask();
189             scheduledFuture =
190                 executor.schedule(monitor, localCheckInterval, TimeUnit.MILLISECONDS);
191         }
192     }
193 
194     /**
195      * Stop the monitoring process.
196      */
197     public synchronized void stop() {
198         if (!monitorActive) {
199             return;
200         }
201         monitorActive = false;
202         resetAccounting(milliSecondFromNano());
203         if (trafficShapingHandler != null) {
204             trafficShapingHandler.doAccounting(this);
205         }
206         if (scheduledFuture != null) {
207             scheduledFuture.cancel(true);
208         }
209     }
210 
211     /**
212      * Reset the accounting on Read and Write.
213      *
214      * @param newLastTime the milliseconds unix timestamp that we should be considered up-to-date for.
215      */
216     synchronized void resetAccounting(long newLastTime) {
217         long interval = newLastTime - lastTime.getAndSet(newLastTime);
218         if (interval == 0) {
219             // nothing to do
220             return;
221         }
222         if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
223             logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
224         }
225         lastReadBytes = currentReadBytes.getAndSet(0);
226         lastWrittenBytes = currentWrittenBytes.getAndSet(0);
227         lastReadThroughput = lastReadBytes * 1000 / interval;
228         // nb byte / checkInterval in ms * 1000 (1s)
229         lastWriteThroughput = lastWrittenBytes * 1000 / interval;
230         // nb byte / checkInterval in ms * 1000 (1s)
231         realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
232         lastWritingTime = Math.max(lastWritingTime, writingTime);
233         lastReadingTime = Math.max(lastReadingTime, readingTime);
234     }
235 
236     /**
237      * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the {@link ScheduledExecutorService}
238      * to use, its name, the checkInterval between two computations in milliseconds.
239      *
240      * @param executor
241      *            the underlying executor service for scheduling checks, might be null when used
242      * from {@link GlobalChannelTrafficCounter}.
243      * @param name
244      *            the name given to this monitor.
245      * @param checkInterval
246      *            the checkInterval in millisecond between two computations.
247      */
248     public TrafficCounter(ScheduledExecutorService executor, String name, long checkInterval) {
249         if (name == null) {
250             throw new NullPointerException("name");
251         }
252 
253         trafficShapingHandler = null;
254         this.executor = executor;
255         this.name = name;
256 
257         init(checkInterval);
258     }
259 
260     /**
261      * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
262      * name, the checkInterval between two computations in millisecond.
263      *
264      * @param trafficShapingHandler
265      *            the associated AbstractTrafficShapingHandler.
266      * @param executor
267      *            the underlying executor service for scheduling checks, might be null when used
268      * from {@link GlobalChannelTrafficCounter}.
269      * @param name
270      *            the name given to this monitor.
271      * @param checkInterval
272      *            the checkInterval in millisecond between two computations.
273      */
274     public TrafficCounter(
275             AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor,
276             String name, long checkInterval) {
277 
278         if (trafficShapingHandler == null) {
279             throw new IllegalArgumentException("trafficShapingHandler");
280         }
281         if (name == null) {
282             throw new NullPointerException("name");
283         }
284 
285         this.trafficShapingHandler = trafficShapingHandler;
286         this.executor = executor;
287         this.name = name;
288 
289         init(checkInterval);
290     }
291 
292     private void init(long checkInterval) {
293         // absolute time: informative only
294         lastCumulativeTime = System.currentTimeMillis();
295         writingTime = milliSecondFromNano();
296         readingTime = writingTime;
297         lastWritingTime = writingTime;
298         lastReadingTime = writingTime;
299         configure(checkInterval);
300     }
301 
302     /**
303      * Change checkInterval between two computations in millisecond.
304      *
305      * @param newCheckInterval The new check interval (in milliseconds)
306      */
307     public void configure(long newCheckInterval) {
308         long newInterval = newCheckInterval / 10 * 10;
309         if (checkInterval.getAndSet(newInterval) != newInterval) {
310             if (newInterval <= 0) {
311                 stop();
312                 // No more active monitoring
313                 lastTime.set(milliSecondFromNano());
314             } else {
315                 // Start if necessary
316                 start();
317             }
318         }
319     }
320 
321     /**
322      * Computes counters for Read.
323      *
324      * @param recv
325      *            the size in bytes to read
326      */
327     void bytesRecvFlowControl(long recv) {
328         currentReadBytes.addAndGet(recv);
329         cumulativeReadBytes.addAndGet(recv);
330     }
331 
332     /**
333      * Computes counters for Write.
334      *
335      * @param write
336      *            the size in bytes to write
337      */
338     void bytesWriteFlowControl(long write) {
339         currentWrittenBytes.addAndGet(write);
340         cumulativeWrittenBytes.addAndGet(write);
341     }
342 
343     /**
344      * Computes counters for Real Write.
345      *
346      * @param write
347      *            the size in bytes to write
348      */
349     void bytesRealWriteFlowControl(long write) {
350         realWrittenBytes.addAndGet(write);
351     }
352 
353     /**
354      * @return the current checkInterval between two computations of traffic counter
355      *         in millisecond.
356      */
357     public long checkInterval() {
358         return checkInterval.get();
359     }
360 
361     /**
362      * @return the Read Throughput in bytes/s computes in the last check interval.
363      */
364     public long lastReadThroughput() {
365         return lastReadThroughput;
366     }
367 
368     /**
369      * @return the Write Throughput in bytes/s computes in the last check interval.
370      */
371     public long lastWriteThroughput() {
372         return lastWriteThroughput;
373     }
374 
375     /**
376      * @return the number of bytes read during the last check Interval.
377      */
378     public long lastReadBytes() {
379         return lastReadBytes;
380     }
381 
382     /**
383      * @return the number of bytes written during the last check Interval.
384      */
385     public long lastWrittenBytes() {
386         return lastWrittenBytes;
387     }
388 
389     /**
390      * @return the current number of bytes read since the last checkInterval.
391      */
392     public long currentReadBytes() {
393         return currentReadBytes.get();
394     }
395 
396     /**
397      * @return the current number of bytes written since the last check Interval.
398      */
399     public long currentWrittenBytes() {
400         return currentWrittenBytes.get();
401     }
402 
403     /**
404      * @return the Time in millisecond of the last check as of System.currentTimeMillis().
405      */
406     public long lastTime() {
407         return lastTime.get();
408     }
409 
410     /**
411      * @return the cumulativeWrittenBytes
412      */
413     public long cumulativeWrittenBytes() {
414         return cumulativeWrittenBytes.get();
415     }
416 
417     /**
418      * @return the cumulativeReadBytes
419      */
420     public long cumulativeReadBytes() {
421         return cumulativeReadBytes.get();
422     }
423 
424     /**
425      * @return the lastCumulativeTime in millisecond as of System.currentTimeMillis()
426      *         when the cumulative counters were reset to 0.
427      */
428     public long lastCumulativeTime() {
429         return lastCumulativeTime;
430     }
431 
432     /**
433      * @return the realWrittenBytes
434      */
435     public AtomicLong getRealWrittenBytes() {
436         return realWrittenBytes;
437     }
438 
439     /**
440      * @return the realWriteThroughput
441      */
442     public long getRealWriteThroughput() {
443         return realWriteThroughput;
444     }
445 
446     /**
447      * Reset both read and written cumulative bytes counters and the associated absolute time
448      * from System.currentTimeMillis().
449      */
450     public void resetCumulativeTime() {
451         lastCumulativeTime = System.currentTimeMillis();
452         cumulativeReadBytes.set(0);
453         cumulativeWrittenBytes.set(0);
454     }
455 
456     /**
457      * @return the name of this TrafficCounter.
458      */
459     public String name() {
460         return name;
461     }
462 
463     /**
464      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
465      * time.
466      *
467      * @param size
468      *            the recv size
469      * @param limitTraffic
470      *            the traffic limit in bytes per second.
471      * @param maxTime
472      *            the max time in ms to wait in case of excess of traffic.
473      * @return the current time to wait (in ms) if needed for Read operation.
474      */
475     @Deprecated
476     public long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
477         return readTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
478     }
479 
480     /**
481      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
482      * time.
483      *
484      * @param size
485      *            the recv size
486      * @param limitTraffic
487      *            the traffic limit in bytes per second
488      * @param maxTime
489      *            the max time in ms to wait in case of excess of traffic.
490      * @param now the current time
491      * @return the current time to wait (in ms) if needed for Read operation.
492      */
493     public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
494         bytesRecvFlowControl(size);
495         if (size == 0 || limitTraffic == 0) {
496             return 0;
497         }
498         final long lastTimeCheck = lastTime.get();
499         long sum = currentReadBytes.get();
500         long localReadingTime = readingTime;
501         long lastRB = lastReadBytes;
502         final long interval = now - lastTimeCheck;
503         long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
504         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
505             // Enough interval time to compute shaping
506             long time = sum * 1000 / limitTraffic - interval + pastDelay;
507             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
508                 if (logger.isDebugEnabled()) {
509                     logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
510                 }
511                 if (time > maxTime && now + time - localReadingTime > maxTime) {
512                     time = maxTime;
513                 }
514                 readingTime = Math.max(localReadingTime, now + time);
515                 return time;
516             }
517             readingTime = Math.max(localReadingTime, now);
518             return 0;
519         }
520         // take the last read interval check to get enough interval time
521         long lastsum = sum + lastRB;
522         long lastinterval = interval + checkInterval.get();
523         long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
524         if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
525             if (logger.isDebugEnabled()) {
526                 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
527             }
528             if (time > maxTime && now + time - localReadingTime > maxTime) {
529                 time = maxTime;
530             }
531             readingTime = Math.max(localReadingTime, now + time);
532             return time;
533         }
534         readingTime = Math.max(localReadingTime, now);
535         return 0;
536     }
537 
538     /**
539      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
540      * the max wait time.
541      *
542      * @param size
543      *            the write size
544      * @param limitTraffic
545      *            the traffic limit in bytes per second.
546      * @param maxTime
547      *            the max time in ms to wait in case of excess of traffic.
548      * @return the current time to wait (in ms) if needed for Write operation.
549      */
550     @Deprecated
551     public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
552         return writeTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
553     }
554 
555     /**
556      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
557      * the max wait time.
558      *
559      * @param size
560      *            the write size
561      * @param limitTraffic
562      *            the traffic limit in bytes per second.
563      * @param maxTime
564      *            the max time in ms to wait in case of excess of traffic.
565      * @param now the current time
566      * @return the current time to wait (in ms) if needed for Write operation.
567      */
568     public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
569         bytesWriteFlowControl(size);
570         if (size == 0 || limitTraffic == 0) {
571             return 0;
572         }
573         final long lastTimeCheck = lastTime.get();
574         long sum = currentWrittenBytes.get();
575         long lastWB = lastWrittenBytes;
576         long localWritingTime = writingTime;
577         long pastDelay = Math.max(lastWritingTime - lastTimeCheck, 0);
578         final long interval = now - lastTimeCheck;
579         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
580             // Enough interval time to compute shaping
581             long time = sum * 1000 / limitTraffic - interval + pastDelay;
582             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
583                 if (logger.isDebugEnabled()) {
584                     logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
585                 }
586                 if (time > maxTime && now + time - localWritingTime > maxTime) {
587                     time = maxTime;
588                 }
589                 writingTime = Math.max(localWritingTime, now + time);
590                 return time;
591             }
592             writingTime = Math.max(localWritingTime, now);
593             return 0;
594         }
595         // take the last write interval check to get enough interval time
596         long lastsum = sum + lastWB;
597         long lastinterval = interval + checkInterval.get();
598         long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
599         if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
600             if (logger.isDebugEnabled()) {
601                 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
602             }
603             if (time > maxTime && now + time - localWritingTime > maxTime) {
604                 time = maxTime;
605             }
606             writingTime = Math.max(localWritingTime, now + time);
607             return time;
608         }
609         writingTime = Math.max(localWritingTime, now);
610         return 0;
611     }
612 
613     @Override
614     public String toString() {
615         return new StringBuilder(165).append("Monitor ").append(name)
616                 .append(" Current Speed Read: ").append(lastReadThroughput >> 10).append(" KB/s, ")
617                 .append("Asked Write: ").append(lastWriteThroughput >> 10).append(" KB/s, ")
618                 .append("Real Write: ").append(realWriteThroughput >> 10).append(" KB/s, ")
619                 .append("Current Read: ").append(currentReadBytes.get() >> 10).append(" KB, ")
620                 .append("Current asked Write: ").append(currentWrittenBytes.get() >> 10).append(" KB, ")
621                 .append("Current real Write: ").append(realWrittenBytes.get() >> 10).append(" KB").toString();
622     }
623 }