View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.traffic;
17  
18  import org.jboss.netty.logging.InternalLogger;
19  import org.jboss.netty.logging.InternalLoggerFactory;
20  import org.jboss.netty.util.Timeout;
21  import org.jboss.netty.util.Timer;
22  import org.jboss.netty.util.TimerTask;
23  
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  /**
28   * <p>TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.</p>
29   *
30   * A TrafficCounter has for goal to count the traffic in order to enable to limit the traffic or not,
31   * globally or per channel. It compute statistics on read and written bytes at the specified
32   * interval and call back the {@link AbstractTrafficShapingHandler} doAccounting method at every
33   * specified interval. If this interval is set to 0, therefore no accounting will be done and only
34   * statistics will be computed at each receive or write operations.
35   */
36  public class TrafficCounter {
37      private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
38  
39      /**
40       * @return the time in ms using nanoTime, so not real EPOCH time but elapsed time in ms.
41       */
42      public static long milliSecondFromNano() {
43          return System.nanoTime() / 1000000;
44      }
45  
46      /**
47       * Current written bytes
48       */
49      private final AtomicLong currentWrittenBytes = new AtomicLong();
50  
51      /**
52       * Current read bytes
53       */
54      private final AtomicLong currentReadBytes = new AtomicLong();
55  
56      /**
57       * Last writing time during current check interval
58       */
59      private long writingTime;
60  
61      /**
62       * Last reading delay during current check interval
63       */
64      private long readingTime;
65  
66      /**
67       * Long life written bytes
68       */
69      private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
70  
71      /**
72       * Long life read bytes
73       */
74      private final AtomicLong cumulativeReadBytes = new AtomicLong();
75  
76      /**
77       * Last Time where cumulative bytes where reset to zero: this time is a real EPOC time (informative only)
78       */
79      private long lastCumulativeTime;
80  
81      /**
82       * Last writing bandwidth
83       */
84      private long lastWriteThroughput;
85  
86      /**
87       * Last reading bandwidth
88       */
89      private long lastReadThroughput;
90  
91      /**
92       * Last Time Check taken
93       */
94      final AtomicLong lastTime = new AtomicLong();
95  
96      /**
97       * Last written bytes number during last check interval
98       */
99      private volatile long lastWrittenBytes;
100 
101     /**
102      * Last read bytes number during last check interval
103      */
104     private volatile long lastReadBytes;
105 
106     /**
107      * Last future writing time during last check interval
108      */
109     private volatile long lastWritingTime;
110 
111     /**
112      * Last reading time during last check interval
113      */
114     private volatile long lastReadingTime;
115 
116     /**
117      * Real written bytes
118      */
119     private final AtomicLong realWrittenBytes = new AtomicLong();
120 
121     /**
122      * Real writing bandwidth
123      */
124     private long realWriteThroughput;
125 
126     /**
127      * Delay between two captures
128      */
129     final AtomicLong checkInterval = new AtomicLong(
130             AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
131 
132     // default 1 s
133 
134     /**
135      * Name of this Monitor
136      */
137     final String name;
138 
139     /**
140      * The associated TrafficShapingHandler
141      */
142     final AbstractTrafficShapingHandler trafficShapingHandler;
143 
144     /**
145      * One Timer for all Counter
146      */
147     final Timer timer;  // replace executor
148 
149     /**
150      * Monitor created once in start()
151      */
152     TimerTask timerTask;
153 
154     /**
155      * used in stop() to cancel the timer
156      */
157     volatile Timeout timeout;
158 
159     /**
160      * Is Monitor active
161      */
162     volatile boolean monitorActive;
163 
164     /**
165      * Class to implement monitoring at fix delay
166      *
167      */
168     private static final class TrafficMonitoringTask implements TimerTask {
169         /**
170          * The associated TrafficShapingHandler
171          */
172         private final AbstractTrafficShapingHandler trafficShapingHandler1;
173 
174         /**
175          * The associated TrafficCounter
176          */
177         private final TrafficCounter counter;
178 
179         TrafficMonitoringTask(
180                 AbstractTrafficShapingHandler trafficShapingHandler,
181                 TrafficCounter counter) {
182             trafficShapingHandler1 = trafficShapingHandler;
183             this.counter = counter;
184         }
185 
186         public void run(Timeout timeout) throws Exception {
187             if (!counter.monitorActive) {
188                 return;
189             }
190             counter.resetAccounting(milliSecondFromNano());
191             if (trafficShapingHandler1 != null) {
192                 trafficShapingHandler1.doAccounting(counter);
193             }
194 
195             counter.timer.newTimeout(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS);
196         }
197     }
198 
199     /**
200      * Start the monitoring process.
201      */
202     public void start() {
203         if (monitorActive) {
204             return;
205         }
206         lastTime.set(milliSecondFromNano());
207         // if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor
208         if (checkInterval.get() > 0 && timer != null) {
209             monitorActive = true;
210             timerTask = new TrafficMonitoringTask(trafficShapingHandler, this);
211             timeout =
212                 timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
213         }
214     }
215 
216     /**
217      * Stop the monitoring process.
218      */
219     public void stop() {
220         if (!monitorActive) {
221             return;
222         }
223         monitorActive = false;
224         resetAccounting(milliSecondFromNano());
225         if (trafficShapingHandler != null) {
226             trafficShapingHandler.doAccounting(this);
227         }
228         if (timeout != null) {
229             timeout.cancel();
230         }
231     }
232 
233     /**
234      * Reset the accounting on Read and Write.
235      */
236     void resetAccounting(long newLastTime) {
237         long interval = newLastTime - lastTime.getAndSet(newLastTime);
238         if (interval == 0) {
239             // nothing to do
240             return;
241         }
242         lastReadBytes = currentReadBytes.getAndSet(0);
243         lastWrittenBytes = currentWrittenBytes.getAndSet(0);
244         lastReadThroughput = lastReadBytes * 1000 / interval;
245         // nb byte / checkInterval in ms * 1000 (1s)
246         lastWriteThroughput = lastWrittenBytes * 1000 / interval;
247         // nb byte / checkInterval in ms * 1000 (1s)
248         realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
249         lastWritingTime = Math.max(lastWritingTime, writingTime);
250         lastReadingTime = Math.max(lastReadingTime, readingTime);
251     }
252 
253     /**
254      * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
255      * name, the checkInterval between two computations in millisecond.
256      * @param trafficShapingHandler the associated AbstractTrafficShapingHandler
257      * @param timer
258      *            Could be a HashedWheelTimer, might be null when used
259      *              from {@link GlobalChannelTrafficCounter}.
260      * @param name
261      *            the name given to this monitor
262      * @param checkInterval
263      *            the checkInterval in millisecond between two computations.
264      */
265     public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
266             Timer timer, String name, long checkInterval) {
267         if (trafficShapingHandler == null) {
268             throw new IllegalArgumentException("TrafficShapingHandler must not be null");
269         }
270         this.trafficShapingHandler = trafficShapingHandler;
271         this.timer = timer;
272         this.name = name;
273         // absolute time: informative only
274         lastCumulativeTime = System.currentTimeMillis();
275         writingTime = milliSecondFromNano();
276         readingTime = writingTime;
277         lastWritingTime = writingTime;
278         lastReadingTime = writingTime;
279         configure(checkInterval);
280     }
281 
282     /**
283      * Change checkInterval between
284      * two computations in millisecond.
285      */
286     public void configure(long newcheckInterval) {
287         long newInterval = newcheckInterval / 10 * 10;
288         if (checkInterval.getAndSet(newInterval) != newInterval) {
289             if (newInterval <= 0) {
290                 stop();
291                 // No more active monitoring
292                 lastTime.set(milliSecondFromNano());
293             } else {
294                 // Start if necessary
295                 start();
296             }
297         }
298     }
299 
300     /**
301      * Computes counters for Read.
302      *
303      * @param recv
304      *            the size in bytes to read
305      */
306     void bytesRecvFlowControl(long recv) {
307         currentReadBytes.addAndGet(recv);
308         cumulativeReadBytes.addAndGet(recv);
309     }
310 
311     /**
312      * Computes counters for Write.
313      *
314      * @param write
315      *            the size in bytes to write
316      */
317     void bytesWriteFlowControl(long write) {
318         currentWrittenBytes.addAndGet(write);
319         cumulativeWrittenBytes.addAndGet(write);
320     }
321 
322     /**
323      * Computes counters for Real Write.
324      *
325      * @param write
326      *            the size in bytes to write
327      */
328     void bytesRealWriteFlowControl(long write) {
329         realWrittenBytes.addAndGet(write);
330     }
331 
332     /**
333      * @return the current checkInterval between two computations of traffic counter
334      *         in millisecond.
335      */
336     public long getCheckInterval() {
337         return checkInterval.get();
338     }
339 
340     /**
341      * @return the Read Throughput in bytes/s computes in the last check interval.
342      */
343     public long getLastReadThroughput() {
344         return lastReadThroughput;
345     }
346 
347     /**
348      * @return the Write Throughput in bytes/s computes in the last check interval.
349      */
350     public long getLastWriteThroughput() {
351         return lastWriteThroughput;
352     }
353 
354     /**
355      * @return the number of bytes read during the last check Interval.
356      */
357     public long getLastReadBytes() {
358         return lastReadBytes;
359     }
360 
361     /**
362      * @return the number of bytes written during the last check Interval.
363      */
364     public long getLastWrittenBytes() {
365         return lastWrittenBytes;
366     }
367 
368     /**
369     * @return the current number of bytes read since the last checkInterval.
370     */
371     public long getCurrentReadBytes() {
372         return currentReadBytes.get();
373     }
374 
375     /**
376      * @return the current number of bytes written since the last check Interval.
377      */
378     public long getCurrentWrittenBytes() {
379         return currentWrittenBytes.get();
380     }
381 
382     /**
383      * @return the Time in millisecond of the last check as of System.currentTimeMillis().
384      */
385     public long getLastTime() {
386         return lastTime.get();
387     }
388 
389     /**
390      * @return the cumulativeWrittenBytes
391      */
392     public long getCumulativeWrittenBytes() {
393         return cumulativeWrittenBytes.get();
394     }
395 
396     /**
397      * @return the cumulativeReadBytes
398      */
399     public long getCumulativeReadBytes() {
400         return cumulativeReadBytes.get();
401     }
402 
403     /**
404      * @return the lastCumulativeTime in millisecond as of System.currentTimeMillis()
405      * when the cumulative counters were reset to 0.
406      */
407     public long getLastCumulativeTime() {
408         return lastCumulativeTime;
409     }
410 
411     /**
412      * @return the realWrittenBytes
413      */
414     public AtomicLong getRealWrittenBytes() {
415         return realWrittenBytes;
416     }
417 
418     /**
419      * @return the realWriteThroughput
420      */
421     public long getRealWriteThroughput() {
422         return realWriteThroughput;
423     }
424 
425     /**
426      * Reset both read and written cumulative bytes counters and the associated absolute time
427      * from System.currentTimeMillis().
428      */
429     public void resetCumulativeTime() {
430         lastCumulativeTime = System.currentTimeMillis();
431         cumulativeReadBytes.set(0);
432         cumulativeWrittenBytes.set(0);
433     }
434 
435     /**
436      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
437      * time.
438      *
439      * @param size
440      *            the recv size
441      * @param limitTraffic
442      *            the traffic limit in bytes per second
443      * @param maxTime
444      *            the max time in ms to wait in case of excess of traffic.
445      * @return the current time to wait (in ms) if needed for Read operation.
446      */
447     @Deprecated
448     public long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
449         return readTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
450     }
451 
452     /**
453      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
454      * time.
455      *
456      * @param size
457      *            the recv size
458      * @param limitTraffic
459      *            the traffic limit in bytes per second
460      * @param maxTime
461      *            the max time in ms to wait in case of excess of traffic.
462      * @param now the current time
463      * @return the current time to wait (in ms) if needed for Read operation.
464      */
465     public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
466         bytesRecvFlowControl(size);
467         if (size == 0 || limitTraffic == 0) {
468             return 0;
469         }
470         final long lastTimeCheck = lastTime.get();
471         long sum = currentReadBytes.get();
472         long localReadingTime = readingTime;
473         long lastRB = lastReadBytes;
474         final long interval = now - lastTimeCheck;
475         long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
476         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
477             // Enough interval time to compute shaping
478             long time = (sum * 1000 / limitTraffic - interval + pastDelay) / 10 * 10;
479             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
480                 if (logger.isDebugEnabled()) {
481                     logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
482                 }
483                 if (time > maxTime && now + time - localReadingTime > maxTime) {
484                     time = maxTime;
485                 }
486                 readingTime = Math.max(localReadingTime, now + time);
487                 return time;
488             }
489             readingTime = Math.max(localReadingTime, now);
490             return 0;
491         }
492         // take the last read interval check to get enough interval time
493         long lastsum = sum + lastRB;
494         long lastinterval = interval + checkInterval.get();
495         long time = (lastsum * 1000 / limitTraffic - lastinterval + pastDelay) / 10 * 10;
496         if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
497             if (logger.isDebugEnabled()) {
498                 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
499             }
500             if (time > maxTime && now + time - localReadingTime > maxTime) {
501                 time = maxTime;
502             }
503             readingTime = Math.max(localReadingTime, now + time);
504             return time;
505         }
506         readingTime = Math.max(localReadingTime, now);
507         return 0;
508     }
509 
510     /**
511      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
512      * the max wait time.
513      *
514      * @param size
515      *            the write size
516      * @param limitTraffic
517      *            the traffic limit in bytes per second
518      * @param maxTime
519      *            the max time in ms to wait in case of excess of traffic.
520      * @return the current time to wait (in ms) if needed for Write operation.
521      */
522     @Deprecated
523     public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
524         return writeTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
525     }
526 
527     /**
528      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
529      * the max wait time.
530      *
531      * @param size
532      *            the write size
533      * @param limitTraffic
534      *            the traffic limit in bytes per second
535      * @param maxTime
536      *            the max time in ms to wait in case of excess of traffic.
537      * @param now the current time
538      * @return the current time to wait (in ms) if needed for Write operation.
539      */
540     public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
541         bytesWriteFlowControl(size);
542         if (size == 0 || limitTraffic == 0) {
543             return 0;
544         }
545         final long lastTimeCheck = lastTime.get();
546         long sum = currentWrittenBytes.get();
547         long lastWB = lastWrittenBytes;
548         long localWritingTime = writingTime;
549         long pastDelay = Math.max(lastWritingTime - lastTimeCheck, 0);
550         final long interval = now - lastTimeCheck;
551         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
552             // Enough interval time to compute shaping
553             long time = (sum * 1000 / limitTraffic - interval + pastDelay) / 10 * 10;
554             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
555                 if (logger.isDebugEnabled()) {
556                     logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
557                 }
558                 if (time > maxTime && now + time - localWritingTime > maxTime) {
559                     time = maxTime;
560                 }
561                 writingTime = Math.max(localWritingTime, now + time);
562                 return time;
563             }
564             writingTime = Math.max(localWritingTime, now);
565             return 0;
566         }
567         // take the last write interval check to get enough interval time
568         long lastsum = sum + lastWB;
569         long lastinterval = interval + checkInterval.get();
570         long time = (lastsum * 1000 / limitTraffic - lastinterval + pastDelay) / 10 * 10;
571         if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
572             if (logger.isDebugEnabled()) {
573                 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
574             }
575             if (time > maxTime && now + time - localWritingTime > maxTime) {
576                 time = maxTime;
577             }
578             writingTime = Math.max(localWritingTime, now + time);
579             return time;
580         }
581         writingTime = Math.max(localWritingTime, now);
582         return 0;
583     }
584 
585     /**
586      * @return the name
587      */
588     public String getName() {
589         return name;
590     }
591 
592     /**
593      * String information
594      */
595     @Override
596     public String toString() {
597         return new StringBuilder(165).append("Monitor ").append(name)
598                 .append(" Current Speed Read: ").append(lastReadThroughput >> 10).append(" KB/s, ")
599                 .append("Asked Write: ").append(lastWriteThroughput >> 10).append(" KB/s, ")
600                 .append("Real Write: ").append(realWriteThroughput >> 10).append(" KB/s, ")
601                 .append("Current Read: ").append(currentReadBytes.get() >> 10).append(" KB, ")
602                 .append("Current asked Write: ").append(currentWrittenBytes.get() >> 10).append(" KB, ")
603                 .append("Current real Write: ").append(realWrittenBytes.get() >> 10).append(" KB").toString();
604     }
605 }