View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.traffic;
17  
18  import static io.netty.util.internal.ObjectUtil.checkNotNull;
19  import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE;
20  import io.netty.util.internal.logging.InternalLogger;
21  import io.netty.util.internal.logging.InternalLoggerFactory;
22  
23  import java.util.concurrent.ScheduledExecutorService;
24  import java.util.concurrent.ScheduledFuture;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  
29  /**
30   * Counts the number of read and written bytes for rate-limiting traffic.
31   * <p>
32   * It computes the statistics for both inbound and outbound traffic periodically at the given
33   * {@code checkInterval}, and calls the {@link AbstractTrafficShapingHandler#doAccounting(TrafficCounter)} method back.
34   * If the {@code checkInterval} is {@code 0}, no accounting will be done and statistics will only be computed at each
35   * receive or write operation.
36   * </p>
37   */
38  public class TrafficCounter {
39  
40      private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
41  
42      /**
43       * @return the time in ms using nanoTime, so not real EPOCH time but elapsed time in ms.
44       */
45      public static long milliSecondFromNano() {
46          return System.nanoTime() / 1000000;
47      }
48  
49      /**
50       * Current written bytes
51       */
52      private final AtomicLong currentWrittenBytes = new AtomicLong();
53  
54      /**
55       * Current read bytes
56       */
57      private final AtomicLong currentReadBytes = new AtomicLong();
58  
59      /**
60       * Last writing time during current check interval
61       */
62      private long writingTime;
63  
64      /**
65       * Last reading delay during current check interval
66       */
67      private long readingTime;
68  
69      /**
70       * Long life written bytes
71       */
72      private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
73  
74      /**
75       * Long life read bytes
76       */
77      private final AtomicLong cumulativeReadBytes = new AtomicLong();
78  
79      /**
80       * Last Time where cumulative bytes where reset to zero: this time is a real EPOC time (informative only)
81       */
82      private long lastCumulativeTime;
83  
84      /**
85       * Last writing bandwidth
86       */
87      private long lastWriteThroughput;
88  
89      /**
90       * Last reading bandwidth
91       */
92      private long lastReadThroughput;
93  
94      /**
95       * Last Time Check taken
96       */
97      final AtomicLong lastTime = new AtomicLong();
98  
99      /**
100      * Last written bytes number during last check interval
101      */
102     private volatile long lastWrittenBytes;
103 
104     /**
105      * Last read bytes number during last check interval
106      */
107     private volatile long lastReadBytes;
108 
109     /**
110      * Last future writing time during last check interval
111      */
112     private volatile long lastWritingTime;
113 
114     /**
115      * Last reading time during last check interval
116      */
117     private volatile long lastReadingTime;
118 
119     /**
120      * Real written bytes
121      */
122     private final AtomicLong realWrittenBytes = new AtomicLong();
123 
124     /**
125      * Real writing bandwidth
126      */
127     private long realWriteThroughput;
128 
129     /**
130      * Delay between two captures
131      */
132     final AtomicLong checkInterval = new AtomicLong(
133             AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
134 
135     // default 1 s
136 
137     /**
138      * Name of this Monitor
139      */
140     final String name;
141 
142     /**
143      * The associated TrafficShapingHandler
144      */
145     final AbstractTrafficShapingHandler trafficShapingHandler;
146 
147     /**
148      * Executor that will run the monitor
149      */
150     final ScheduledExecutorService executor;
151     /**
152      * Monitor created once in start()
153      */
154     Runnable monitor;
155     /**
156      * used in stop() to cancel the timer
157      */
158     volatile ScheduledFuture<?> scheduledFuture;
159 
160     /**
161      * Is Monitor active
162      */
163     volatile boolean monitorActive;
164 
165     /**
166      * Class to implement monitoring at fix delay
167      *
168      */
169     private final class TrafficMonitoringTask implements Runnable {
170         @Override
171         public void run() {
172             if (!monitorActive) {
173                 return;
174             }
175             resetAccounting(milliSecondFromNano());
176             if (trafficShapingHandler != null) {
177                 trafficShapingHandler.doAccounting(TrafficCounter.this);
178             }
179         }
180     }
181 
182     /**
183      * Start the monitoring process.
184      */
185     public synchronized void start() {
186         if (monitorActive) {
187             return;
188         }
189         lastTime.set(milliSecondFromNano());
190         long localCheckInterval = checkInterval.get();
191         // if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor
192         if (localCheckInterval > 0 && executor != null) {
193             monitorActive = true;
194             monitor = new TrafficMonitoringTask();
195             scheduledFuture =
196                 executor.scheduleAtFixedRate(monitor, 0, localCheckInterval, TimeUnit.MILLISECONDS);
197         }
198     }
199 
200     /**
201      * Stop the monitoring process.
202      */
203     public synchronized void stop() {
204         if (!monitorActive) {
205             return;
206         }
207         monitorActive = false;
208         resetAccounting(milliSecondFromNano());
209         if (trafficShapingHandler != null) {
210             trafficShapingHandler.doAccounting(this);
211         }
212         if (scheduledFuture != null) {
213             scheduledFuture.cancel(true);
214         }
215     }
216 
217     /**
218      * Reset the accounting on Read and Write.
219      *
220      * @param newLastTime the milliseconds unix timestamp that we should be considered up-to-date for.
221      */
222     synchronized void resetAccounting(long newLastTime) {
223         long interval = newLastTime - lastTime.getAndSet(newLastTime);
224         if (interval == 0) {
225             // nothing to do
226             return;
227         }
228         if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
229             logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
230         }
231         lastReadBytes = currentReadBytes.getAndSet(0);
232         lastWrittenBytes = currentWrittenBytes.getAndSet(0);
233         lastReadThroughput = lastReadBytes * 1000 / interval;
234         // nb byte / checkInterval in ms * 1000 (1s)
235         lastWriteThroughput = lastWrittenBytes * 1000 / interval;
236         // nb byte / checkInterval in ms * 1000 (1s)
237         realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
238         lastWritingTime = Math.max(lastWritingTime, writingTime);
239         lastReadingTime = Math.max(lastReadingTime, readingTime);
240     }
241 
242     /**
243      * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the {@link ScheduledExecutorService}
244      * to use, its name, the checkInterval between two computations in milliseconds.
245      *
246      * @param executor
247      *            the underlying executor service for scheduling checks, might be null when used
248      * from {@link GlobalChannelTrafficCounter}.
249      * @param name
250      *            the name given to this monitor.
251      * @param checkInterval
252      *            the checkInterval in millisecond between two computations.
253      */
254     public TrafficCounter(ScheduledExecutorService executor, String name, long checkInterval) {
255 
256         this.name = checkNotNull(name, "name");
257         trafficShapingHandler = null;
258         this.executor = executor;
259 
260         init(checkInterval);
261     }
262 
263     /**
264      * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
265      * name, the checkInterval between two computations in millisecond.
266      *
267      * @param trafficShapingHandler
268      *            the associated AbstractTrafficShapingHandler.
269      * @param executor
270      *            the underlying executor service for scheduling checks, might be null when used
271      * from {@link GlobalChannelTrafficCounter}.
272      * @param name
273      *            the name given to this monitor.
274      * @param checkInterval
275      *            the checkInterval in millisecond between two computations.
276      */
277     public TrafficCounter(
278             AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor,
279             String name, long checkInterval) {
280         this.name = checkNotNull(name, "name");
281         this.trafficShapingHandler = checkNotNullWithIAE(trafficShapingHandler, "trafficShapingHandler");
282         this.executor = executor;
283 
284         init(checkInterval);
285     }
286 
287     private void init(long checkInterval) {
288         // absolute time: informative only
289         lastCumulativeTime = System.currentTimeMillis();
290         writingTime = milliSecondFromNano();
291         readingTime = writingTime;
292         lastWritingTime = writingTime;
293         lastReadingTime = writingTime;
294         configure(checkInterval);
295     }
296 
297     /**
298      * Change checkInterval between two computations in millisecond.
299      *
300      * @param newCheckInterval The new check interval (in milliseconds)
301      */
302     public void configure(long newCheckInterval) {
303         long newInterval = newCheckInterval / 10 * 10;
304         if (checkInterval.getAndSet(newInterval) != newInterval) {
305             if (newInterval <= 0) {
306                 stop();
307                 // No more active monitoring
308                 lastTime.set(milliSecondFromNano());
309             } else {
310                 // Restart
311                 stop();
312                 start();
313             }
314         }
315     }
316 
317     /**
318      * Computes counters for Read.
319      *
320      * @param recv
321      *            the size in bytes to read
322      */
323     void bytesRecvFlowControl(long recv) {
324         currentReadBytes.addAndGet(recv);
325         cumulativeReadBytes.addAndGet(recv);
326     }
327 
328     /**
329      * Computes counters for Write.
330      *
331      * @param write
332      *            the size in bytes to write
333      */
334     void bytesWriteFlowControl(long write) {
335         currentWrittenBytes.addAndGet(write);
336         cumulativeWrittenBytes.addAndGet(write);
337     }
338 
339     /**
340      * Computes counters for Real Write.
341      *
342      * @param write
343      *            the size in bytes to write
344      */
345     void bytesRealWriteFlowControl(long write) {
346         realWrittenBytes.addAndGet(write);
347     }
348 
349     /**
350      * @return the current checkInterval between two computations of traffic counter
351      *         in millisecond.
352      */
353     public long checkInterval() {
354         return checkInterval.get();
355     }
356 
357     /**
358      * @return the Read Throughput in bytes/s computes in the last check interval.
359      */
360     public long lastReadThroughput() {
361         return lastReadThroughput;
362     }
363 
364     /**
365      * @return the Write Throughput in bytes/s computes in the last check interval.
366      */
367     public long lastWriteThroughput() {
368         return lastWriteThroughput;
369     }
370 
371     /**
372      * @return the number of bytes read during the last check Interval.
373      */
374     public long lastReadBytes() {
375         return lastReadBytes;
376     }
377 
378     /**
379      * @return the number of bytes written during the last check Interval.
380      */
381     public long lastWrittenBytes() {
382         return lastWrittenBytes;
383     }
384 
385     /**
386      * @return the current number of bytes read since the last checkInterval.
387      */
388     public long currentReadBytes() {
389         return currentReadBytes.get();
390     }
391 
392     /**
393      * @return the current number of bytes written since the last check Interval.
394      */
395     public long currentWrittenBytes() {
396         return currentWrittenBytes.get();
397     }
398 
399     /**
400      * @return the Time in millisecond of the last check as of System.currentTimeMillis().
401      */
402     public long lastTime() {
403         return lastTime.get();
404     }
405 
406     /**
407      * @return the cumulativeWrittenBytes
408      */
409     public long cumulativeWrittenBytes() {
410         return cumulativeWrittenBytes.get();
411     }
412 
413     /**
414      * @return the cumulativeReadBytes
415      */
416     public long cumulativeReadBytes() {
417         return cumulativeReadBytes.get();
418     }
419 
420     /**
421      * @return the lastCumulativeTime in millisecond as of System.currentTimeMillis()
422      * when the cumulative counters were reset to 0.
423      */
424     public long lastCumulativeTime() {
425         return lastCumulativeTime;
426     }
427 
428     /**
429      * @return the realWrittenBytes
430      */
431     public AtomicLong getRealWrittenBytes() {
432         return realWrittenBytes;
433     }
434 
435     /**
436      * @return the realWriteThroughput
437      */
438     public long getRealWriteThroughput() {
439         return realWriteThroughput;
440     }
441 
442     /**
443      * Reset both read and written cumulative bytes counters and the associated absolute time
444      * from System.currentTimeMillis().
445      */
446     public void resetCumulativeTime() {
447         lastCumulativeTime = System.currentTimeMillis();
448         cumulativeReadBytes.set(0);
449         cumulativeWrittenBytes.set(0);
450     }
451 
452     /**
453      * @return the name of this TrafficCounter.
454      */
455     public String name() {
456         return name;
457     }
458 
459     /**
460      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
461      * time.
462      *
463      * @param size
464      *            the recv size
465      * @param limitTraffic
466      *            the traffic limit in bytes per second.
467      * @param maxTime
468      *            the max time in ms to wait in case of excess of traffic.
469      * @return the current time to wait (in ms) if needed for Read operation.
470      */
471     @Deprecated
472     public long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
473         return readTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
474     }
475 
476     /**
477      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
478      * time.
479      *
480      * @param size
481      *            the recv size
482      * @param limitTraffic
483      *            the traffic limit in bytes per second
484      * @param maxTime
485      *            the max time in ms to wait in case of excess of traffic.
486      * @param now the current time
487      * @return the current time to wait (in ms) if needed for Read operation.
488      */
489     public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
490         bytesRecvFlowControl(size);
491         if (size == 0 || limitTraffic == 0) {
492             return 0;
493         }
494         final long lastTimeCheck = lastTime.get();
495         long sum = currentReadBytes.get();
496         long localReadingTime = readingTime;
497         long lastRB = lastReadBytes;
498         final long interval = now - lastTimeCheck;
499         long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
500         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
501             // Enough interval time to compute shaping
502             long time = sum * 1000 / limitTraffic - interval + pastDelay;
503             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
504                 if (logger.isDebugEnabled()) {
505                     logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
506                 }
507                 if (time > maxTime && now + time - localReadingTime > maxTime) {
508                     time = maxTime;
509                 }
510                 readingTime = Math.max(localReadingTime, now + time);
511                 return time;
512             }
513             readingTime = Math.max(localReadingTime, now);
514             return 0;
515         }
516         // take the last read interval check to get enough interval time
517         long lastsum = sum + lastRB;
518         long lastinterval = interval + checkInterval.get();
519         long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
520         if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
521             if (logger.isDebugEnabled()) {
522                 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
523             }
524             if (time > maxTime && now + time - localReadingTime > maxTime) {
525                 time = maxTime;
526             }
527             readingTime = Math.max(localReadingTime, now + time);
528             return time;
529         }
530         readingTime = Math.max(localReadingTime, now);
531         return 0;
532     }
533 
534     /**
535      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
536      * the max wait time.
537      *
538      * @param size
539      *            the write size
540      * @param limitTraffic
541      *            the traffic limit in bytes per second.
542      * @param maxTime
543      *            the max time in ms to wait in case of excess of traffic.
544      * @return the current time to wait (in ms) if needed for Write operation.
545      */
546     @Deprecated
547     public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
548         return writeTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
549     }
550 
551     /**
552      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
553      * the max wait time.
554      *
555      * @param size
556      *            the write size
557      * @param limitTraffic
558      *            the traffic limit in bytes per second.
559      * @param maxTime
560      *            the max time in ms to wait in case of excess of traffic.
561      * @param now the current time
562      * @return the current time to wait (in ms) if needed for Write operation.
563      */
564     public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
565         bytesWriteFlowControl(size);
566         if (size == 0 || limitTraffic == 0) {
567             return 0;
568         }
569         final long lastTimeCheck = lastTime.get();
570         long sum = currentWrittenBytes.get();
571         long lastWB = lastWrittenBytes;
572         long localWritingTime = writingTime;
573         long pastDelay = Math.max(lastWritingTime - lastTimeCheck, 0);
574         final long interval = now - lastTimeCheck;
575         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
576             // Enough interval time to compute shaping
577             long time = sum * 1000 / limitTraffic - interval + pastDelay;
578             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
579                 if (logger.isDebugEnabled()) {
580                     logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
581                 }
582                 if (time > maxTime && now + time - localWritingTime > maxTime) {
583                     time = maxTime;
584                 }
585                 writingTime = Math.max(localWritingTime, now + time);
586                 return time;
587             }
588             writingTime = Math.max(localWritingTime, now);
589             return 0;
590         }
591         // take the last write interval check to get enough interval time
592         long lastsum = sum + lastWB;
593         long lastinterval = interval + checkInterval.get();
594         long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
595         if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
596             if (logger.isDebugEnabled()) {
597                 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
598             }
599             if (time > maxTime && now + time - localWritingTime > maxTime) {
600                 time = maxTime;
601             }
602             writingTime = Math.max(localWritingTime, now + time);
603             return time;
604         }
605         writingTime = Math.max(localWritingTime, now);
606         return 0;
607     }
608 
609     @Override
610     public String toString() {
611         return new StringBuilder(165).append("Monitor ").append(name)
612                 .append(" Current Speed Read: ").append(lastReadThroughput >> 10).append(" KB/s, ")
613                 .append("Asked Write: ").append(lastWriteThroughput >> 10).append(" KB/s, ")
614                 .append("Real Write: ").append(realWriteThroughput >> 10).append(" KB/s, ")
615                 .append("Current Read: ").append(currentReadBytes.get() >> 10).append(" KB, ")
616                 .append("Current asked Write: ").append(currentWrittenBytes.get() >> 10).append(" KB, ")
617                 .append("Current real Write: ").append(realWrittenBytes.get() >> 10).append(" KB").toString();
618     }
619 }