View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.traffic;
17  
18  import io.netty5.util.concurrent.EventExecutor;
19  import io.netty5.util.concurrent.EventExecutorGroup;
20  import io.netty5.util.concurrent.Future;
21  import io.netty5.util.internal.logging.InternalLogger;
22  import io.netty5.util.internal.logging.InternalLoggerFactory;
23  
24  import java.util.concurrent.ScheduledExecutorService;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import static io.netty5.util.internal.ObjectUtil.checkNotNullWithIAE;
29  import static java.util.Objects.requireNonNull;
30  
31  
32  /**
33   * Counts the number of read and written bytes for rate-limiting traffic.
34   * <p>
35   * It computes the statistics for both inbound and outbound traffic periodically at the given
36   * {@code checkInterval}, and calls the {@link AbstractTrafficShapingHandler#doAccounting(TrafficCounter)} method back.
37   * If the {@code checkInterval} is {@code 0}, no accounting will be done and statistics will only be computed at each
38   * receive or write operation.
39   * </p>
40   */
41  public class TrafficCounter {
42  
43      private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
44  
45      /**
46       * @return the time in ms using nanoTime, so not real EPOCH time but elapsed time in ms.
47       */
48      public static long milliSecondFromNano() {
49          return System.nanoTime() / 1000000;
50      }
51  
52      /**
53       * Current written bytes
54       */
55      private final AtomicLong currentWrittenBytes = new AtomicLong();
56  
57      /**
58       * Current read bytes
59       */
60      private final AtomicLong currentReadBytes = new AtomicLong();
61  
62      /**
63       * Last writing time during current check interval
64       */
65      private long writingTime;
66  
67      /**
68       * Last reading delay during current check interval
69       */
70      private long readingTime;
71  
72      /**
73       * Long life written bytes
74       */
75      private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
76  
77      /**
78       * Long life read bytes
79       */
80      private final AtomicLong cumulativeReadBytes = new AtomicLong();
81  
82      /**
83       * Last Time where cumulative bytes where reset to zero: this time is a real EPOC time (informative only)
84       */
85      private long lastCumulativeTime;
86  
87      /**
88       * Last writing bandwidth
89       */
90      private long lastWriteThroughput;
91  
92      /**
93       * Last reading bandwidth
94       */
95      private long lastReadThroughput;
96  
97      /**
98       * Last Time Check taken
99       */
100     final AtomicLong lastTime = new AtomicLong();
101 
102     /**
103      * Last written bytes number during last check interval
104      */
105     private volatile long lastWrittenBytes;
106 
107     /**
108      * Last read bytes number during last check interval
109      */
110     private volatile long lastReadBytes;
111 
112     /**
113      * Last future writing time during last check interval
114      */
115     private volatile long lastWritingTime;
116 
117     /**
118      * Last reading time during last check interval
119      */
120     private volatile long lastReadingTime;
121 
122     /**
123      * Real written bytes
124      */
125     private final AtomicLong realWrittenBytes = new AtomicLong();
126 
127     /**
128      * Real writing bandwidth
129      */
130     private long realWriteThroughput;
131 
132     /**
133      * Delay between two captures
134      */
135     final AtomicLong checkInterval = new AtomicLong(
136             AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
137 
138     // default 1 s
139 
140     /**
141      * Name of this Monitor
142      */
143     final String name;
144 
145     /**
146      * The associated TrafficShapingHandler
147      */
148     final AbstractTrafficShapingHandler trafficShapingHandler;
149 
150     /**
151      * Executor that will run the monitor
152      */
153     final EventExecutorGroup executor;
154     /**
155      * Monitor created once in start()
156      */
157     Runnable monitor;
158     /**
159      * used in stop() to cancel the timer
160      */
161     volatile Future<?> scheduledFuture;
162 
163     /**
164      * Is Monitor active
165      */
166     volatile boolean monitorActive;
167 
168     /**
169      * Class to implement monitoring at fix delay
170      *
171      */
172     private final class TrafficMonitoringTask implements Runnable {
173         @Override
174         public void run() {
175             if (!monitorActive) {
176                 return;
177             }
178             resetAccounting(milliSecondFromNano());
179             if (trafficShapingHandler != null) {
180                 trafficShapingHandler.doAccounting(TrafficCounter.this);
181             }
182         }
183     }
184 
185     /**
186      * Start the monitoring process.
187      */
188     public synchronized void start() {
189         if (monitorActive) {
190             return;
191         }
192         lastTime.set(milliSecondFromNano());
193         long localCheckInterval = checkInterval.get();
194         // if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor
195         if (localCheckInterval > 0 && executor != null) {
196             monitorActive = true;
197             monitor = new TrafficMonitoringTask();
198             scheduledFuture =
199                 executor.scheduleAtFixedRate(monitor, 0, localCheckInterval, TimeUnit.MILLISECONDS);
200         }
201     }
202 
203     /**
204      * Stop the monitoring process.
205      */
206     public synchronized void stop() {
207         if (!monitorActive) {
208             return;
209         }
210         monitorActive = false;
211         resetAccounting(milliSecondFromNano());
212         if (trafficShapingHandler != null) {
213             trafficShapingHandler.doAccounting(this);
214         }
215         if (scheduledFuture != null) {
216             scheduledFuture.cancel();
217         }
218     }
219 
220     /**
221      * Reset the accounting on Read and Write.
222      *
223      * @param newLastTime the milliseconds unix timestamp that we should be considered up-to-date for.
224      */
225     synchronized void resetAccounting(long newLastTime) {
226         long interval = newLastTime - lastTime.getAndSet(newLastTime);
227         if (interval == 0) {
228             // nothing to do
229             return;
230         }
231         if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
232             logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
233         }
234         lastReadBytes = currentReadBytes.getAndSet(0);
235         lastWrittenBytes = currentWrittenBytes.getAndSet(0);
236         lastReadThroughput = lastReadBytes * 1000 / interval;
237         // nb byte / checkInterval in ms * 1000 (1s)
238         lastWriteThroughput = lastWrittenBytes * 1000 / interval;
239         // nb byte / checkInterval in ms * 1000 (1s)
240         realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
241         lastWritingTime = Math.max(lastWritingTime, writingTime);
242         lastReadingTime = Math.max(lastReadingTime, readingTime);
243     }
244 
245     /**
246      * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the {@link ScheduledExecutorService}
247      * to use, its name, the checkInterval between two computations in milliseconds.
248      *
249      * @param executor
250      *            the underlying executor service for scheduling checks, might be null when used
251      * from {@link GlobalChannelTrafficCounter}.
252      * @param name
253      *            the name given to this monitor.
254      * @param checkInterval
255      *            the checkInterval in millisecond between two computations.
256      */
257     public TrafficCounter(EventExecutor executor, String name, long checkInterval) {
258         requireNonNull(name, "name");
259 
260         trafficShapingHandler = null;
261         this.executor = executor;
262         this.name = name;
263 
264         init(checkInterval);
265     }
266 
267     /**
268      * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
269      * name, the checkInterval between two computations in millisecond.
270      *
271      * @param trafficShapingHandler
272      *            the associated AbstractTrafficShapingHandler.
273      * @param executor
274      *            the underlying executor service for scheduling checks, might be null when used
275      * from {@link GlobalChannelTrafficCounter}.
276      * @param name
277      *            the name given to this monitor.
278      * @param checkInterval
279      *            the checkInterval in millisecond between two computations.
280      */
281     public TrafficCounter(
282             AbstractTrafficShapingHandler trafficShapingHandler, EventExecutorGroup executor,
283             String name, long checkInterval) {
284         this.name = requireNonNull(name, "name");
285         this.trafficShapingHandler = checkNotNullWithIAE(trafficShapingHandler, "trafficShapingHandler");
286         this.executor = executor;
287 
288         init(checkInterval);
289     }
290 
291     private void init(long checkInterval) {
292         // absolute time: informative only
293         lastCumulativeTime = System.currentTimeMillis();
294         writingTime = milliSecondFromNano();
295         readingTime = writingTime;
296         lastWritingTime = writingTime;
297         lastReadingTime = writingTime;
298         configure(checkInterval);
299     }
300 
301     /**
302      * Change checkInterval between two computations in millisecond.
303      *
304      * @param newCheckInterval The new check interval (in milliseconds)
305      */
306     public void configure(long newCheckInterval) {
307         long newInterval = newCheckInterval / 10 * 10;
308         if (checkInterval.getAndSet(newInterval) != newInterval) {
309             stop();
310             if (newInterval <= 0) {
311                 // No more active monitoring
312                 lastTime.set(milliSecondFromNano());
313             } else {
314                 // Restart
315                 start();
316             }
317         }
318     }
319 
320     /**
321      * Computes counters for Read.
322      *
323      * @param recv
324      *            the size in bytes to read
325      */
326     void bytesRecvFlowControl(long recv) {
327         currentReadBytes.addAndGet(recv);
328         cumulativeReadBytes.addAndGet(recv);
329     }
330 
331     /**
332      * Computes counters for Write.
333      *
334      * @param write
335      *            the size in bytes to write
336      */
337     void bytesWriteFlowControl(long write) {
338         currentWrittenBytes.addAndGet(write);
339         cumulativeWrittenBytes.addAndGet(write);
340     }
341 
342     /**
343      * Computes counters for Real Write.
344      *
345      * @param write
346      *            the size in bytes to write
347      */
348     void bytesRealWriteFlowControl(long write) {
349         realWrittenBytes.addAndGet(write);
350     }
351 
352     /**
353      * @return the current checkInterval between two computations of traffic counter
354      *         in millisecond.
355      */
356     public long checkInterval() {
357         return checkInterval.get();
358     }
359 
360     /**
361      * @return the Read Throughput in bytes/s computes in the last check interval.
362      */
363     public long lastReadThroughput() {
364         return lastReadThroughput;
365     }
366 
367     /**
368      * @return the Write Throughput in bytes/s computes in the last check interval.
369      */
370     public long lastWriteThroughput() {
371         return lastWriteThroughput;
372     }
373 
374     /**
375      * @return the number of bytes read during the last check Interval.
376      */
377     public long lastReadBytes() {
378         return lastReadBytes;
379     }
380 
381     /**
382      * @return the number of bytes written during the last check Interval.
383      */
384     public long lastWrittenBytes() {
385         return lastWrittenBytes;
386     }
387 
388     /**
389      * @return the current number of bytes read since the last checkInterval.
390      */
391     public long currentReadBytes() {
392         return currentReadBytes.get();
393     }
394 
395     /**
396      * @return the current number of bytes written since the last check Interval.
397      */
398     public long currentWrittenBytes() {
399         return currentWrittenBytes.get();
400     }
401 
402     /**
403      * @return the Time in millisecond of the last check as of System.currentTimeMillis().
404      */
405     public long lastTime() {
406         return lastTime.get();
407     }
408 
409     /**
410      * @return the cumulativeWrittenBytes
411      */
412     public long cumulativeWrittenBytes() {
413         return cumulativeWrittenBytes.get();
414     }
415 
416     /**
417      * @return the cumulativeReadBytes
418      */
419     public long cumulativeReadBytes() {
420         return cumulativeReadBytes.get();
421     }
422 
423     /**
424      * @return the lastCumulativeTime in millisecond as of System.currentTimeMillis()
425      * when the cumulative counters were reset to 0.
426      */
427     public long lastCumulativeTime() {
428         return lastCumulativeTime;
429     }
430 
431     /**
432      * @return the realWrittenBytes
433      */
434     public AtomicLong getRealWrittenBytes() {
435         return realWrittenBytes;
436     }
437 
438     /**
439      * @return the realWriteThroughput
440      */
441     public long getRealWriteThroughput() {
442         return realWriteThroughput;
443     }
444 
445     /**
446      * Reset both read and written cumulative bytes counters and the associated absolute time
447      * from System.currentTimeMillis().
448      */
449     public void resetCumulativeTime() {
450         lastCumulativeTime = System.currentTimeMillis();
451         cumulativeReadBytes.set(0);
452         cumulativeWrittenBytes.set(0);
453     }
454 
455     /**
456      * @return the name of this TrafficCounter.
457      */
458     public String name() {
459         return name;
460     }
461 
462     /**
463      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
464      * time.
465      *
466      * @param size
467      *            the recv size
468      * @param limitTraffic
469      *            the traffic limit in bytes per second.
470      * @param maxTime
471      *            the max time in ms to wait in case of excess of traffic.
472      * @return the current time to wait (in ms) if needed for Read operation.
473      */
474     @Deprecated
475     public long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
476         return readTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
477     }
478 
479     /**
480      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
481      * time.
482      *
483      * @param size
484      *            the recv size
485      * @param limitTraffic
486      *            the traffic limit in bytes per second
487      * @param maxTime
488      *            the max time in ms to wait in case of excess of traffic.
489      * @param now the current time
490      * @return the current time to wait (in ms) if needed for Read operation.
491      */
492     public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
493         bytesRecvFlowControl(size);
494         if (size == 0 || limitTraffic == 0) {
495             return 0;
496         }
497         final long lastTimeCheck = lastTime.get();
498         long sum = currentReadBytes.get();
499         long localReadingTime = readingTime;
500         long lastRB = lastReadBytes;
501         final long interval = now - lastTimeCheck;
502         long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
503         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
504             // Enough interval time to compute shaping
505             long time = sum * 1000 / limitTraffic - interval + pastDelay;
506             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
507                 if (logger.isDebugEnabled()) {
508                     logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
509                 }
510                 if (time > maxTime && now + time - localReadingTime > maxTime) {
511                     time = maxTime;
512                 }
513                 readingTime = Math.max(localReadingTime, now + time);
514                 return time;
515             }
516             readingTime = Math.max(localReadingTime, now);
517             return 0;
518         }
519         // take the last read interval check to get enough interval time
520         long lastsum = sum + lastRB;
521         long lastinterval = interval + checkInterval.get();
522         long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
523         if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
524             if (logger.isDebugEnabled()) {
525                 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
526             }
527             if (time > maxTime && now + time - localReadingTime > maxTime) {
528                 time = maxTime;
529             }
530             readingTime = Math.max(localReadingTime, now + time);
531             return time;
532         }
533         readingTime = Math.max(localReadingTime, now);
534         return 0;
535     }
536 
537     /**
538      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
539      * the max wait time.
540      *
541      * @param size
542      *            the write size
543      * @param limitTraffic
544      *            the traffic limit in bytes per second.
545      * @param maxTime
546      *            the max time in ms to wait in case of excess of traffic.
547      * @return the current time to wait (in ms) if needed for Write operation.
548      */
549     @Deprecated
550     public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
551         return writeTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
552     }
553 
554     /**
555      * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
556      * the max wait time.
557      *
558      * @param size
559      *            the write size
560      * @param limitTraffic
561      *            the traffic limit in bytes per second.
562      * @param maxTime
563      *            the max time in ms to wait in case of excess of traffic.
564      * @param now the current time
565      * @return the current time to wait (in ms) if needed for Write operation.
566      */
567     public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
568         bytesWriteFlowControl(size);
569         if (size == 0 || limitTraffic == 0) {
570             return 0;
571         }
572         final long lastTimeCheck = lastTime.get();
573         long sum = currentWrittenBytes.get();
574         long lastWB = lastWrittenBytes;
575         long localWritingTime = writingTime;
576         long pastDelay = Math.max(lastWritingTime - lastTimeCheck, 0);
577         final long interval = now - lastTimeCheck;
578         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
579             // Enough interval time to compute shaping
580             long time = sum * 1000 / limitTraffic - interval + pastDelay;
581             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
582                 if (logger.isDebugEnabled()) {
583                     logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
584                 }
585                 if (time > maxTime && now + time - localWritingTime > maxTime) {
586                     time = maxTime;
587                 }
588                 writingTime = Math.max(localWritingTime, now + time);
589                 return time;
590             }
591             writingTime = Math.max(localWritingTime, now);
592             return 0;
593         }
594         // take the last write interval check to get enough interval time
595         long lastsum = sum + lastWB;
596         long lastinterval = interval + checkInterval.get();
597         long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
598         if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
599             if (logger.isDebugEnabled()) {
600                 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
601             }
602             if (time > maxTime && now + time - localWritingTime > maxTime) {
603                 time = maxTime;
604             }
605             writingTime = Math.max(localWritingTime, now + time);
606             return time;
607         }
608         writingTime = Math.max(localWritingTime, now);
609         return 0;
610     }
611 
612     @Override
613     public String toString() {
614         return new StringBuilder(165).append("Monitor ").append(name)
615                 .append(" Current Speed Read: ").append(lastReadThroughput >> 10).append(" KB/s, ")
616                 .append("Asked Write: ").append(lastWriteThroughput >> 10).append(" KB/s, ")
617                 .append("Real Write: ").append(realWriteThroughput >> 10).append(" KB/s, ")
618                 .append("Current Read: ").append(currentReadBytes.get() >> 10).append(" KB, ")
619                 .append("Current asked Write: ").append(currentWrittenBytes.get() >> 10).append(" KB, ")
620                 .append("Current real Write: ").append(realWrittenBytes.get() >> 10).append(" KB").toString();
621     }
622 }