View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.traffic;
17  
18  import org.jboss.netty.logging.InternalLogger;
19  import org.jboss.netty.logging.InternalLoggerFactory;
20  import org.jboss.netty.util.Timeout;
21  import org.jboss.netty.util.Timer;
22  import org.jboss.netty.util.TimerTask;
23  
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  /**
29   * TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.<br>
30   * <br>
31   * A TrafficCounter has for goal to count the traffic in order to enable to limit the traffic or not,
32   * globally or per channel. It compute statistics on read and written bytes at the specified
33   * interval and call back the {@link AbstractTrafficShapingHandler} doAccounting method at every
34   * specified interval. If this interval is set to 0, therefore no accounting will be done and only
35   * statistics will be computed at each receive or write operations.
36   */
37  public class TrafficCounter {
38      private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
39  
40      /**
41       * Current written bytes
42       */
43      private final AtomicLong currentWrittenBytes = new AtomicLong();
44  
45      /**
46       * Current read bytes
47       */
48      private final AtomicLong currentReadBytes = new AtomicLong();
49  
50      /**
51       * Long life written bytes
52       */
53      private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
54  
55      /**
56       * Long life read bytes
57       */
58      private final AtomicLong cumulativeReadBytes = new AtomicLong();
59  
60      /**
61       * Last Time where cumulative bytes where reset to zero
62       */
63      private long lastCumulativeTime;
64  
65      /**
66       * Last writing bandwidth
67       */
68      private long lastWriteThroughput;
69  
70      /**
71       * Last reading bandwidth
72       */
73      private long lastReadThroughput;
74  
75      /**
76       * Last Time Check taken
77       */
78      private final AtomicLong lastTime = new AtomicLong();
79  
80      /**
81       * Last written bytes number during last check interval
82       */
83      private long lastWrittenBytes;
84  
85      /**
86       * Last read bytes number during last check interval
87       */
88      private long lastReadBytes;
89  
90      /**
91       * Last non 0 written bytes number during last check interval
92       */
93      private long lastNonNullWrittenBytes;
94  
95      /**
96       * Last time written bytes with non 0 written bytes
97       */
98      private long lastNonNullWrittenTime;
99  
100     /**
101      * Last time read bytes with non 0 written bytes
102      */
103     private long lastNonNullReadTime;
104 
105     /**
106      * Last non 0 read bytes number during last check interval
107      */
108     private long lastNonNullReadBytes;
109 
110     /**
111      * Delay between two captures
112      */
113     final AtomicLong checkInterval = new AtomicLong(
114             AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
115 
116     // default 1 s
117 
118     /**
119      * Name of this Monitor
120      */
121     final String name;
122 
123     /**
124      * The associated TrafficShapingHandler
125      */
126     private final AbstractTrafficShapingHandler trafficShapingHandler;
127 
128     /**
129      * One Timer for all Counter
130      */
131     private final Timer timer;  // replace executor
132     /**
133      * Monitor created once in start()
134      */
135     private TimerTask timerTask;
136     /**
137      * used in stop() to cancel the timer
138      */
139    private volatile Timeout timeout;
140 
141     /**
142      * Is Monitor active
143      */
144     final AtomicBoolean monitorActive = new AtomicBoolean();
145 
146     /**
147      * Class to implement monitoring at fix delay
148      *
149      */
150     private static class TrafficMonitoringTask implements TimerTask {
151         /**
152          * The associated TrafficShapingHandler
153          */
154         private final AbstractTrafficShapingHandler trafficShapingHandler1;
155 
156         /**
157          * The associated TrafficCounter
158          */
159         private final TrafficCounter counter;
160 
161         protected TrafficMonitoringTask(
162                 AbstractTrafficShapingHandler trafficShapingHandler,
163                 TrafficCounter counter) {
164             trafficShapingHandler1 = trafficShapingHandler;
165             this.counter = counter;
166         }
167 
168         public void run(Timeout timeout) throws Exception {
169             if (!counter.monitorActive.get()) {
170                 return;
171             }
172             long endTime = System.currentTimeMillis();
173             counter.resetAccounting(endTime);
174             if (trafficShapingHandler1 != null) {
175                 trafficShapingHandler1.doAccounting(counter);
176             }
177 
178             counter.timer.newTimeout(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS);
179         }
180     }
181 
182     /**
183      * Start the monitoring process
184      */
185     public void start() {
186         synchronized (lastTime) {
187             if (monitorActive.get()) {
188                 return;
189             }
190             lastTime.set(System.currentTimeMillis());
191             if (checkInterval.get() > 0) {
192                 monitorActive.set(true);
193                 timerTask = new TrafficMonitoringTask(trafficShapingHandler, this);
194                 timeout =
195                     timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
196             }
197         }
198     }
199 
200     /**
201      * Stop the monitoring process
202      */
203     public void stop() {
204         synchronized (lastTime) {
205             if (!monitorActive.get()) {
206                 return;
207             }
208             monitorActive.set(false);
209             resetAccounting(System.currentTimeMillis());
210             if (trafficShapingHandler != null) {
211                 trafficShapingHandler.doAccounting(this);
212             }
213             if (timeout != null) {
214                 timeout.cancel();
215             }
216         }
217     }
218 
219     /**
220      * Reset the accounting on Read and Write
221      */
222     void resetAccounting(long newLastTime) {
223         synchronized (lastTime) {
224             long interval = newLastTime - lastTime.getAndSet(newLastTime);
225             if (interval == 0) {
226                 // nothing to do
227                 return;
228             }
229             lastReadBytes = currentReadBytes.getAndSet(0);
230             lastWrittenBytes = currentWrittenBytes.getAndSet(0);
231             lastReadThroughput = lastReadBytes * 1000 / interval;
232             // nb byte / checkInterval in ms * 1000 (1s)
233             lastWriteThroughput = lastWrittenBytes * 1000 / interval;
234             // nb byte / checkInterval in ms * 1000 (1s)
235         }
236         if (lastWrittenBytes > 0) {
237             lastNonNullWrittenBytes = lastWrittenBytes;
238             lastNonNullWrittenTime = newLastTime;
239         }
240         if (lastReadBytes > 0) {
241             lastNonNullReadBytes = lastReadBytes;
242             lastNonNullReadTime = newLastTime;
243         }
244     }
245 
246     /**
247      * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
248      * name, the checkInterval between two computations in millisecond
249      * @param trafficShapingHandler the associated AbstractTrafficShapingHandler
250      * @param timer
251      *            Could be a HashedWheelTimer
252      * @param name
253      *            the name given to this monitor
254      * @param checkInterval
255      *            the checkInterval in millisecond between two computations
256      */
257     public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
258             Timer timer, String name, long checkInterval) {
259         this.trafficShapingHandler = trafficShapingHandler;
260         this.timer = timer;
261         this.name = name;
262         lastCumulativeTime = System.currentTimeMillis();
263         configure(checkInterval);
264     }
265 
266     /**
267      * Change checkInterval between
268      * two computations in millisecond
269      */
270     public void configure(long newcheckInterval) {
271         long newInterval = newcheckInterval / 10 * 10;
272         if (checkInterval.get() != newInterval) {
273             checkInterval.set(newInterval);
274             if (newInterval <= 0) {
275                 stop();
276                 // No more active monitoring
277                 lastTime.set(System.currentTimeMillis());
278             } else {
279                 // Start if necessary
280                 start();
281             }
282         }
283     }
284 
285     /**
286      * Computes counters for Read.
287      *
288      * @param recv
289      *            the size in bytes to read
290      */
291     void bytesRecvFlowControl(long recv) {
292         currentReadBytes.addAndGet(recv);
293         cumulativeReadBytes.addAndGet(recv);
294     }
295 
296     /**
297      * Computes counters for Write.
298      *
299      * @param write
300      *            the size in bytes to write
301      */
302     void bytesWriteFlowControl(long write) {
303         currentWrittenBytes.addAndGet(write);
304         cumulativeWrittenBytes.addAndGet(write);
305     }
306 
307     /**
308      *
309      * @return the current checkInterval between two computations of traffic counter
310      *         in millisecond
311      */
312     public long getCheckInterval() {
313         return checkInterval.get();
314     }
315 
316     /**
317      *
318      * @return the Read Throughput in bytes/s computes in the last check interval
319      */
320     public long getLastReadThroughput() {
321         return lastReadThroughput;
322     }
323 
324     /**
325      *
326      * @return the Write Throughput in bytes/s computes in the last check interval
327      */
328     public long getLastWriteThroughput() {
329         return lastWriteThroughput;
330     }
331 
332     /**
333      *
334      * @return the number of bytes read during the last check Interval
335      */
336     public long getLastReadBytes() {
337         return lastReadBytes;
338     }
339 
340     /**
341      *
342      * @return the number of bytes written during the last check Interval
343      */
344     public long getLastWrittenBytes() {
345         return lastWrittenBytes;
346     }
347 
348     /**
349     *
350     * @return the current number of bytes read since the last checkInterval
351     */
352     public long getCurrentReadBytes() {
353         return currentReadBytes.get();
354     }
355 
356     /**
357      *
358      * @return the current number of bytes written since the last check Interval
359      */
360     public long getCurrentWrittenBytes() {
361         return currentWrittenBytes.get();
362     }
363 
364     /**
365      * @return the Time in millisecond of the last check as of System.currentTimeMillis()
366      */
367     public long getLastTime() {
368         return lastTime.get();
369     }
370 
371     /**
372      * @return the cumulativeWrittenBytes
373      */
374     public long getCumulativeWrittenBytes() {
375         return cumulativeWrittenBytes.get();
376     }
377 
378     /**
379      * @return the cumulativeReadBytes
380      */
381     public long getCumulativeReadBytes() {
382         return cumulativeReadBytes.get();
383     }
384 
385     /**
386      * @return the lastCumulativeTime in millisecond as of System.currentTimeMillis()
387      * when the cumulative counters were reset to 0.
388      */
389     public long getLastCumulativeTime() {
390         return lastCumulativeTime;
391     }
392 
393     /**
394      * Reset both read and written cumulative bytes counters and the associated time.
395      */
396     public void resetCumulativeTime() {
397         lastCumulativeTime = System.currentTimeMillis();
398         cumulativeReadBytes.set(0);
399         cumulativeWrittenBytes.set(0);
400     }
401 
402     /**
403      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
404      * time
405      *
406      * @param size
407      *            the recv size
408      * @param limitTraffic
409      *            the traffic limit in bytes per second
410      * @param maxTime
411      *            the max time in ms to wait in case of excess of traffic
412      * @return the current time to wait (in ms) if needed for Read operation
413      */
414     public synchronized long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
415         final long now = System.currentTimeMillis();
416         bytesRecvFlowControl(size);
417         if (limitTraffic == 0) {
418             return 0;
419         }
420         long sum = currentReadBytes.get();
421         long interval = now - lastTime.get();
422         // Short time checking
423         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) {
424             long time = (sum * 1000 / limitTraffic - interval) / 10 * 10;
425             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
426                 if (logger.isDebugEnabled()) {
427                     logger.debug("Time: " + time + ":" + sum + ":" + interval);
428                 }
429                 return time > maxTime ? maxTime : time;
430             }
431             return 0;
432         }
433         // long time checking
434         if (lastNonNullReadBytes > 0 && lastNonNullReadTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) {
435             long lastsum = sum + lastNonNullReadBytes;
436             long lastinterval = now - lastNonNullReadTime;
437             long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10;
438             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
439                 if (logger.isDebugEnabled()) {
440                     logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval);
441                 }
442                 return time > maxTime ? maxTime : time;
443             }
444         } else {
445             // final "middle" time checking in case resetAccounting called very recently
446             sum += lastReadBytes;
447             long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT;
448             long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10;
449             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
450                 if (logger.isDebugEnabled()) {
451                     logger.debug("Time: " + time + ":" + sum + ":" + lastinterval);
452                 }
453                 return time > maxTime ? maxTime : time;
454             }
455         }
456         return 0;
457     }
458 
459     /**
460      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
461      * the max wait time
462      *
463      * @param size
464      *            the write size
465      * @param limitTraffic
466      *            the traffic limit in bytes per second
467      * @param maxTime
468      *            the max time in ms to wait in case of excess of traffic
469      * @return the current time to wait (in ms) if needed for Write operation
470      */
471     public synchronized long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
472         bytesWriteFlowControl(size);
473         if (limitTraffic == 0) {
474             return 0;
475         }
476         long sum = currentWrittenBytes.get();
477         final long now = System.currentTimeMillis();
478         long interval = now - lastTime.get();
479         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) {
480             long time = (sum * 1000 / limitTraffic - interval) / 10 * 10;
481             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
482                 if (logger.isDebugEnabled()) {
483                     logger.debug("Time: " + time + ":" + sum + ":" + interval);
484                 }
485                 return time > maxTime ? maxTime : time;
486             }
487             return 0;
488         }
489         if (lastNonNullWrittenBytes > 0 && lastNonNullWrittenTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) {
490             long lastsum = sum + lastNonNullWrittenBytes;
491             long lastinterval = now - lastNonNullWrittenTime;
492             long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10;
493             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
494                 if (logger.isDebugEnabled()) {
495                     logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval);
496                 }
497                 return time > maxTime ? maxTime : time;
498             }
499         } else {
500             sum += lastWrittenBytes;
501             long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT + Math.abs(interval);
502             long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10;
503             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
504                 if (logger.isDebugEnabled()) {
505                     logger.debug("Time: " + time + ":" + sum + ":" + lastinterval);
506                 }
507                 return time > maxTime ? maxTime : time;
508             }
509         }
510         return 0;
511     }
512 
513     /**
514      * @return the name
515      */
516     public String getName() {
517         return name;
518     }
519 
520     /**
521      * String information
522      */
523     @Override
524     public String toString() {
525         return "Monitor " + name + " Current Speed Read: " +
526                 (lastReadThroughput >> 10) + " KB/s, Write: " +
527                 (lastWriteThroughput >> 10) + " KB/s Current Read: " +
528                 (currentReadBytes.get() >> 10) + " KB Current Write: " +
529                 (currentWrittenBytes.get() >> 10) + " KB";
530     }
531 }