View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.traffic;
17  
18  import io.netty.util.internal.logging.InternalLogger;
19  import io.netty.util.internal.logging.InternalLoggerFactory;
20  
21  import java.util.concurrent.ScheduledExecutorService;
22  import java.util.concurrent.ScheduledFuture;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.atomic.AtomicLong;
25  
26  
27  /**
28   * TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.
29   *
30   * <p>A <tt>TrafficCounter</tt> counts the read and written bytes such that the
31   * {@link AbstractTrafficShapingHandler} can limit the traffic, globally or per channel.</p>
32   *
33   * <p>It computes the statistics for both read and written every {@link #checkInterval}, and calls
34   * back to its parent {@link AbstractTrafficShapingHandler#doAccounting} method.  If the checkInterval
35   * is set to 0, no accounting will be done and statistics will only be computed at each receive or
36   * write operation.</p>
37   */
38  public class TrafficCounter {
39      private static final InternalLogger logger =
40              InternalLoggerFactory.getInstance(TrafficCounter.class);
41  
42      /**
43       * @return the time in ms using nanoTime, so not real EPOCH time but elapsed time in ms.
44       */
45      public static long milliSecondFromNano() {
46          return System.nanoTime() / 1000000;
47      }
48  
49      /**
50       * Current written bytes
51       */
52      private final AtomicLong currentWrittenBytes = new AtomicLong();
53  
54      /**
55       * Current read bytes
56       */
57      private final AtomicLong currentReadBytes = new AtomicLong();
58  
59      /**
60       * Last writing time during current check interval
61       */
62      private long writingTime;
63  
64      /**
65       * Last reading delay during current check interval
66       */
67      private long readingTime;
68  
69      /**
70       * Long life written bytes
71       */
72      private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
73  
74      /**
75       * Long life read bytes
76       */
77      private final AtomicLong cumulativeReadBytes = new AtomicLong();
78  
79      /**
80       * Last Time where cumulative bytes where reset to zero: this time is a real EPOC time (informative only)
81       */
82      private long lastCumulativeTime;
83  
84      /**
85       * Last writing bandwidth
86       */
87      private long lastWriteThroughput;
88  
89      /**
90       * Last reading bandwidth
91       */
92      private long lastReadThroughput;
93  
94      /**
95       * Last Time Check taken
96       */
97      final AtomicLong lastTime = new AtomicLong();
98  
99      /**
100      * Last written bytes number during last check interval
101      */
102     private volatile long lastWrittenBytes;
103 
104     /**
105      * Last read bytes number during last check interval
106      */
107     private volatile long lastReadBytes;
108 
109     /**
110      * Last future writing time during last check interval
111      */
112     private volatile long lastWritingTime;
113 
114     /**
115      * Last reading time during last check interval
116      */
117     private volatile long lastReadingTime;
118 
119     /**
120      * Real written bytes
121      */
122     private final AtomicLong realWrittenBytes = new AtomicLong();
123 
124     /**
125      * Real writing bandwidth
126      */
127     private long realWriteThroughput;
128 
129     /**
130      * Delay between two captures
131      */
132     final AtomicLong checkInterval = new AtomicLong(
133             AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
134 
135     // default 1 s
136 
137     /**
138      * Name of this Monitor
139      */
140     final String name;
141 
142     /**
143      * The associated TrafficShapingHandler
144      */
145     final AbstractTrafficShapingHandler trafficShapingHandler;
146 
147     /**
148      * Executor that will run the monitor
149      */
150     final ScheduledExecutorService executor;
151     /**
152      * Monitor created once in start()
153      */
154     Runnable monitor;
155     /**
156      * used in stop() to cancel the timer
157      */
158     volatile ScheduledFuture<?> scheduledFuture;
159 
160     /**
161      * Is Monitor active
162      */
163     volatile boolean monitorActive;
164 
165     /**
166      * Class to implement monitoring at fix delay
167      *
168      */
169     private static class TrafficMonitoringTask implements Runnable {
170         /**
171          * The associated TrafficShapingHandler
172          */
173         private final AbstractTrafficShapingHandler trafficShapingHandler1;
174 
175         /**
176          * The associated TrafficCounter
177          */
178         private final TrafficCounter counter;
179 
180         /**
181          * @param trafficShapingHandler
182          *            The parent handler to which this task needs to callback to for accounting.
183          * @param counter
184          *            The parent TrafficCounter that we need to reset the statistics for.
185          */
186         protected TrafficMonitoringTask(
187                 AbstractTrafficShapingHandler trafficShapingHandler,
188                 TrafficCounter counter) {
189             trafficShapingHandler1 = trafficShapingHandler;
190             this.counter = counter;
191         }
192 
193         @Override
194         public void run() {
195             if (!counter.monitorActive) {
196                 return;
197             }
198             counter.resetAccounting(milliSecondFromNano());
199             if (trafficShapingHandler1 != null) {
200                 trafficShapingHandler1.doAccounting(counter);
201             }
202             counter.scheduledFuture = counter.executor.schedule(this, counter.checkInterval.get(),
203                                                                 TimeUnit.MILLISECONDS);
204         }
205     }
206 
207     /**
208      * Start the monitoring process.
209      */
210     public synchronized void start() {
211         if (monitorActive) {
212             return;
213         }
214         lastTime.set(milliSecondFromNano());
215         long localCheckInterval = checkInterval.get();
216         // if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor
217         if (localCheckInterval > 0 && executor != null) {
218             monitorActive = true;
219             monitor = new TrafficMonitoringTask(trafficShapingHandler, this);
220             scheduledFuture =
221                 executor.schedule(monitor, localCheckInterval, TimeUnit.MILLISECONDS);
222         }
223     }
224 
225     /**
226      * Stop the monitoring process.
227      */
228     public synchronized void stop() {
229         if (!monitorActive) {
230             return;
231         }
232         monitorActive = false;
233         resetAccounting(milliSecondFromNano());
234         if (trafficShapingHandler != null) {
235             trafficShapingHandler.doAccounting(this);
236         }
237         if (scheduledFuture != null) {
238             scheduledFuture.cancel(true);
239         }
240     }
241 
242     /**
243      * Reset the accounting on Read and Write.
244      *
245      * @param newLastTime the milliseconds unix timestamp that we should be considered up-to-date for.
246      */
247     synchronized void resetAccounting(long newLastTime) {
248         long interval = newLastTime - lastTime.getAndSet(newLastTime);
249         if (interval == 0) {
250             // nothing to do
251             return;
252         }
253         if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
254             logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
255         }
256         lastReadBytes = currentReadBytes.getAndSet(0);
257         lastWrittenBytes = currentWrittenBytes.getAndSet(0);
258         lastReadThroughput = lastReadBytes * 1000 / interval;
259         // nb byte / checkInterval in ms * 1000 (1s)
260         lastWriteThroughput = lastWrittenBytes * 1000 / interval;
261         // nb byte / checkInterval in ms * 1000 (1s)
262         realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
263         lastWritingTime = Math.max(lastWritingTime, writingTime);
264         lastReadingTime = Math.max(lastReadingTime, readingTime);
265     }
266 
267     /**
268      * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
269      * name, the checkInterval between two computations in millisecond.
270      *
271      * @param trafficShapingHandler
272      *            the associated AbstractTrafficShapingHandler.
273      * @param executor
274      *            the underlying executor service for scheduling checks, might be null when used
275      * from {@link GlobalChannelTrafficCounter}.
276      * @param name
277      *            the name given to this monitor.
278      * @param checkInterval
279      *            the checkInterval in millisecond between two computations.
280      */
281     public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor,
282             String name, long checkInterval) {
283         if (trafficShapingHandler == null) {
284             throw new IllegalArgumentException("TrafficShapingHandler must not be null");
285         }
286         this.trafficShapingHandler = trafficShapingHandler;
287         this.executor = executor;
288         this.name = name;
289         // absolute time: informative only
290         lastCumulativeTime = System.currentTimeMillis();
291         writingTime = milliSecondFromNano();
292         readingTime = writingTime;
293         lastWritingTime = writingTime;
294         lastReadingTime = writingTime;
295         configure(checkInterval);
296     }
297 
298     /**
299      * Change checkInterval between two computations in millisecond.
300      *
301      * @param newcheckInterval The new check interval (in milliseconds)
302      */
303     public void configure(long newcheckInterval) {
304         long newInterval = newcheckInterval / 10 * 10;
305         if (checkInterval.getAndSet(newInterval) != newInterval) {
306             if (newInterval <= 0) {
307                 stop();
308                 // No more active monitoring
309                 lastTime.set(milliSecondFromNano());
310             } else {
311                 // Start if necessary
312                 start();
313             }
314         }
315     }
316 
317     /**
318      * Computes counters for Read.
319      *
320      * @param recv
321      *            the size in bytes to read
322      */
323     void bytesRecvFlowControl(long recv) {
324         currentReadBytes.addAndGet(recv);
325         cumulativeReadBytes.addAndGet(recv);
326     }
327 
328     /**
329      * Computes counters for Write.
330      *
331      * @param write
332      *            the size in bytes to write
333      */
334     void bytesWriteFlowControl(long write) {
335         currentWrittenBytes.addAndGet(write);
336         cumulativeWrittenBytes.addAndGet(write);
337     }
338 
339     /**
340      * Computes counters for Real Write.
341      *
342      * @param write
343      *            the size in bytes to write
344      */
345     void bytesRealWriteFlowControl(long write) {
346         realWrittenBytes.addAndGet(write);
347     }
348 
349     /**
350      * @return the current checkInterval between two computations of traffic counter
351      *         in millisecond.
352      */
353     public long checkInterval() {
354         return checkInterval.get();
355     }
356 
357     /**
358      * @return the Read Throughput in bytes/s computes in the last check interval.
359      */
360     public long lastReadThroughput() {
361         return lastReadThroughput;
362     }
363 
364     /**
365      * @return the Write Throughput in bytes/s computes in the last check interval.
366      */
367     public long lastWriteThroughput() {
368         return lastWriteThroughput;
369     }
370 
371     /**
372      * @return the number of bytes read during the last check Interval.
373      */
374     public long lastReadBytes() {
375         return lastReadBytes;
376     }
377 
378     /**
379      * @return the number of bytes written during the last check Interval.
380      */
381     public long lastWrittenBytes() {
382         return lastWrittenBytes;
383     }
384 
385     /**
386      * @return the current number of bytes read since the last checkInterval.
387      */
388     public long currentReadBytes() {
389         return currentReadBytes.get();
390     }
391 
392     /**
393      * @return the current number of bytes written since the last check Interval.
394      */
395     public long currentWrittenBytes() {
396         return currentWrittenBytes.get();
397     }
398 
399     /**
400      * @return the Time in millisecond of the last check as of System.currentTimeMillis().
401      */
402     public long lastTime() {
403         return lastTime.get();
404     }
405 
406     /**
407      * @return the cumulativeWrittenBytes
408      */
409     public long cumulativeWrittenBytes() {
410         return cumulativeWrittenBytes.get();
411     }
412 
413     /**
414      * @return the cumulativeReadBytes
415      */
416     public long cumulativeReadBytes() {
417         return cumulativeReadBytes.get();
418     }
419 
420     /**
421      * @return the lastCumulativeTime in millisecond as of System.currentTimeMillis()
422      * when the cumulative counters were reset to 0.
423      */
424     public long lastCumulativeTime() {
425         return lastCumulativeTime;
426     }
427 
428     /**
429      * @return the realWrittenBytes
430      */
431     public AtomicLong getRealWrittenBytes() {
432         return realWrittenBytes;
433     }
434 
435     /**
436      * @return the realWriteThroughput
437      */
438     public long getRealWriteThroughput() {
439         return realWriteThroughput;
440     }
441 
442     /**
443      * Reset both read and written cumulative bytes counters and the associated absolute time
444      * from System.currentTimeMillis().
445      */
446     public void resetCumulativeTime() {
447         lastCumulativeTime = System.currentTimeMillis();
448         cumulativeReadBytes.set(0);
449         cumulativeWrittenBytes.set(0);
450     }
451 
452     /**
453      * @return the name of this TrafficCounter.
454      */
455     public String name() {
456         return name;
457     }
458 
459     /**
460      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
461      * time.
462      *
463      * @param size
464      *            the recv size
465      * @param limitTraffic
466      *            the traffic limit in bytes per second.
467      * @param maxTime
468      *            the max time in ms to wait in case of excess of traffic.
469      * @return the current time to wait (in ms) if needed for Read operation.
470      */
471     @Deprecated
472     public long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
473         return readTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
474     }
475 
476     /**
477      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
478      * time.
479      *
480      * @param size
481      *            the recv size
482      * @param limitTraffic
483      *            the traffic limit in bytes per second
484      * @param maxTime
485      *            the max time in ms to wait in case of excess of traffic.
486      * @param now the current time
487      * @return the current time to wait (in ms) if needed for Read operation.
488      */
489     public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
490         bytesRecvFlowControl(size);
491         if (size == 0 || limitTraffic == 0) {
492             return 0;
493         }
494         final long lastTimeCheck = lastTime.get();
495         long sum = currentReadBytes.get();
496         long localReadingTime = readingTime;
497         long lastRB = lastReadBytes;
498         final long interval = now - lastTimeCheck;
499         long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
500         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
501             // Enough interval time to compute shaping
502             long time = sum * 1000 / limitTraffic - interval + pastDelay;
503             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
504                 if (logger.isDebugEnabled()) {
505                     logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
506                 }
507                 if (time > maxTime && now + time - localReadingTime > maxTime) {
508                     time = maxTime;
509                 }
510                 readingTime = Math.max(localReadingTime, now + time);
511                 return time;
512             }
513             readingTime = Math.max(localReadingTime, now);
514             return 0;
515         }
516         // take the last read interval check to get enough interval time
517         long lastsum = sum + lastRB;
518         long lastinterval = interval + checkInterval.get();
519         long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
520         if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
521             if (logger.isDebugEnabled()) {
522                 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
523             }
524             if (time > maxTime && now + time - localReadingTime > maxTime) {
525                 time = maxTime;
526             }
527             readingTime = Math.max(localReadingTime, now + time);
528             return time;
529         }
530         readingTime = Math.max(localReadingTime, now);
531         return 0;
532     }
533 
534     /**
535      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
536      * the max wait time.
537      *
538      * @param size
539      *            the write size
540      * @param limitTraffic
541      *            the traffic limit in bytes per second.
542      * @param maxTime
543      *            the max time in ms to wait in case of excess of traffic.
544      * @return the current time to wait (in ms) if needed for Write operation.
545      */
546     @Deprecated
547     public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
548         return writeTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
549     }
550 
551     /**
552      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
553      * the max wait time.
554      *
555      * @param size
556      *            the write size
557      * @param limitTraffic
558      *            the traffic limit in bytes per second.
559      * @param maxTime
560      *            the max time in ms to wait in case of excess of traffic.
561      * @param now the current time
562      * @return the current time to wait (in ms) if needed for Write operation.
563      */
564     public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
565         bytesWriteFlowControl(size);
566         if (size == 0 || limitTraffic == 0) {
567             return 0;
568         }
569         final long lastTimeCheck = lastTime.get();
570         long sum = currentWrittenBytes.get();
571         long lastWB = lastWrittenBytes;
572         long localWritingTime = writingTime;
573         long pastDelay = Math.max(lastWritingTime - lastTimeCheck, 0);
574         final long interval = now - lastTimeCheck;
575         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
576             // Enough interval time to compute shaping
577             long time = sum * 1000 / limitTraffic - interval + pastDelay;
578             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
579                 if (logger.isDebugEnabled()) {
580                     logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
581                 }
582                 if (time > maxTime && now + time - localWritingTime > maxTime) {
583                     time = maxTime;
584                 }
585                 writingTime = Math.max(localWritingTime, now + time);
586                 return time;
587             }
588             writingTime = Math.max(localWritingTime, now);
589             return 0;
590         }
591         // take the last write interval check to get enough interval time
592         long lastsum = sum + lastWB;
593         long lastinterval = interval + checkInterval.get();
594         long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
595         if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
596             if (logger.isDebugEnabled()) {
597                 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
598             }
599             if (time > maxTime && now + time - localWritingTime > maxTime) {
600                 time = maxTime;
601             }
602             writingTime = Math.max(localWritingTime, now + time);
603             return time;
604         }
605         writingTime = Math.max(localWritingTime, now);
606         return 0;
607     }
608 
609     @Override
610     public String toString() {
611         return new StringBuilder(165).append("Monitor ").append(name)
612                 .append(" Current Speed Read: ").append(lastReadThroughput >> 10).append(" KB/s, ")
613                 .append("Asked Write: ").append(lastWriteThroughput >> 10).append(" KB/s, ")
614                 .append("Real Write: ").append(realWriteThroughput >> 10).append(" KB/s, ")
615                 .append("Current Read: ").append(currentReadBytes.get() >> 10).append(" KB, ")
616                 .append("Current asked Write: ").append(currentWrittenBytes.get() >> 10).append(" KB, ")
617                 .append("Current real Write: ").append(realWrittenBytes.get() >> 10).append(" KB").toString();
618     }
619 }