1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package io.netty5.handler.traffic;
17  
18  import io.netty5.util.concurrent.EventExecutor;
19  import io.netty5.util.concurrent.EventExecutorGroup;
20  import io.netty5.util.concurrent.Future;
21  import io.netty5.util.internal.logging.InternalLogger;
22  import io.netty5.util.internal.logging.InternalLoggerFactory;
23  
24  import java.util.concurrent.ScheduledExecutorService;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import static io.netty5.util.internal.ObjectUtil.checkNotNullWithIAE;
29  import static java.util.Objects.requireNonNull;
30  
31  
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  public class TrafficCounter {
42  
43      private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
44  
45      
46  
47  
48      public static long milliSecondFromNano() {
49          return System.nanoTime() / 1000000;
50      }
51  
52      
53  
54  
55      private final AtomicLong currentWrittenBytes = new AtomicLong();
56  
57      
58  
59  
60      private final AtomicLong currentReadBytes = new AtomicLong();
61  
62      
63  
64  
65      private long writingTime;
66  
67      
68  
69  
70      private long readingTime;
71  
72      
73  
74  
75      private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
76  
77      
78  
79  
80      private final AtomicLong cumulativeReadBytes = new AtomicLong();
81  
82      
83  
84  
85      private long lastCumulativeTime;
86  
87      
88  
89  
90      private long lastWriteThroughput;
91  
92      
93  
94  
95      private long lastReadThroughput;
96  
97      
98  
99  
100     final AtomicLong lastTime = new AtomicLong();
101 
102     
103 
104 
105     private volatile long lastWrittenBytes;
106 
107     
108 
109 
110     private volatile long lastReadBytes;
111 
112     
113 
114 
115     private volatile long lastWritingTime;
116 
117     
118 
119 
120     private volatile long lastReadingTime;
121 
122     
123 
124 
125     private final AtomicLong realWrittenBytes = new AtomicLong();
126 
127     
128 
129 
130     private long realWriteThroughput;
131 
132     
133 
134 
135     final AtomicLong checkInterval = new AtomicLong(
136             AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
137 
138     
139 
140     
141 
142 
143     final String name;
144 
145     
146 
147 
148     final AbstractTrafficShapingHandler trafficShapingHandler;
149 
150     
151 
152 
153     final EventExecutorGroup executor;
154     
155 
156 
157     Runnable monitor;
158     
159 
160 
161     volatile Future<?> scheduledFuture;
162 
163     
164 
165 
166     volatile boolean monitorActive;
167 
168     
169 
170 
171 
172     private final class TrafficMonitoringTask implements Runnable {
173         @Override
174         public void run() {
175             if (!monitorActive) {
176                 return;
177             }
178             resetAccounting(milliSecondFromNano());
179             if (trafficShapingHandler != null) {
180                 trafficShapingHandler.doAccounting(TrafficCounter.this);
181             }
182         }
183     }
184 
185     
186 
187 
188     public synchronized void start() {
189         if (monitorActive) {
190             return;
191         }
192         lastTime.set(milliSecondFromNano());
193         long localCheckInterval = checkInterval.get();
194         
195         if (localCheckInterval > 0 && executor != null) {
196             monitorActive = true;
197             monitor = new TrafficMonitoringTask();
198             scheduledFuture =
199                 executor.scheduleAtFixedRate(monitor, 0, localCheckInterval, TimeUnit.MILLISECONDS);
200         }
201     }
202 
203     
204 
205 
206     public synchronized void stop() {
207         if (!monitorActive) {
208             return;
209         }
210         monitorActive = false;
211         resetAccounting(milliSecondFromNano());
212         if (trafficShapingHandler != null) {
213             trafficShapingHandler.doAccounting(this);
214         }
215         if (scheduledFuture != null) {
216             scheduledFuture.cancel();
217         }
218     }
219 
220     
221 
222 
223 
224 
225     synchronized void resetAccounting(long newLastTime) {
226         long interval = newLastTime - lastTime.getAndSet(newLastTime);
227         if (interval == 0) {
228             
229             return;
230         }
231         if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
232             logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
233         }
234         lastReadBytes = currentReadBytes.getAndSet(0);
235         lastWrittenBytes = currentWrittenBytes.getAndSet(0);
236         lastReadThroughput = lastReadBytes * 1000 / interval;
237         
238         lastWriteThroughput = lastWrittenBytes * 1000 / interval;
239         
240         realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
241         lastWritingTime = Math.max(lastWritingTime, writingTime);
242         lastReadingTime = Math.max(lastReadingTime, readingTime);
243     }
244 
245     
246 
247 
248 
249 
250 
251 
252 
253 
254 
255 
256 
257     public TrafficCounter(EventExecutor executor, String name, long checkInterval) {
258         requireNonNull(name, "name");
259 
260         trafficShapingHandler = null;
261         this.executor = executor;
262         this.name = name;
263 
264         init(checkInterval);
265     }
266 
267     
268 
269 
270 
271 
272 
273 
274 
275 
276 
277 
278 
279 
280 
281     public TrafficCounter(
282             AbstractTrafficShapingHandler trafficShapingHandler, EventExecutorGroup executor,
283             String name, long checkInterval) {
284         this.name = requireNonNull(name, "name");
285         this.trafficShapingHandler = checkNotNullWithIAE(trafficShapingHandler, "trafficShapingHandler");
286         this.executor = executor;
287 
288         init(checkInterval);
289     }
290 
291     private void init(long checkInterval) {
292         
293         lastCumulativeTime = System.currentTimeMillis();
294         writingTime = milliSecondFromNano();
295         readingTime = writingTime;
296         lastWritingTime = writingTime;
297         lastReadingTime = writingTime;
298         configure(checkInterval);
299     }
300 
301     
302 
303 
304 
305 
306     public void configure(long newCheckInterval) {
307         long newInterval = newCheckInterval / 10 * 10;
308         if (checkInterval.getAndSet(newInterval) != newInterval) {
309             stop();
310             if (newInterval <= 0) {
311                 
312                 lastTime.set(milliSecondFromNano());
313             } else {
314                 
315                 start();
316             }
317         }
318     }
319 
320     
321 
322 
323 
324 
325 
326     void bytesRecvFlowControl(long recv) {
327         currentReadBytes.addAndGet(recv);
328         cumulativeReadBytes.addAndGet(recv);
329     }
330 
331     
332 
333 
334 
335 
336 
337     void bytesWriteFlowControl(long write) {
338         currentWrittenBytes.addAndGet(write);
339         cumulativeWrittenBytes.addAndGet(write);
340     }
341 
342     
343 
344 
345 
346 
347 
348     void bytesRealWriteFlowControl(long write) {
349         realWrittenBytes.addAndGet(write);
350     }
351 
352     
353 
354 
355 
356     public long checkInterval() {
357         return checkInterval.get();
358     }
359 
360     
361 
362 
363     public long lastReadThroughput() {
364         return lastReadThroughput;
365     }
366 
367     
368 
369 
370     public long lastWriteThroughput() {
371         return lastWriteThroughput;
372     }
373 
374     
375 
376 
377     public long lastReadBytes() {
378         return lastReadBytes;
379     }
380 
381     
382 
383 
384     public long lastWrittenBytes() {
385         return lastWrittenBytes;
386     }
387 
388     
389 
390 
391     public long currentReadBytes() {
392         return currentReadBytes.get();
393     }
394 
395     
396 
397 
398     public long currentWrittenBytes() {
399         return currentWrittenBytes.get();
400     }
401 
402     
403 
404 
405     public long lastTime() {
406         return lastTime.get();
407     }
408 
409     
410 
411 
412     public long cumulativeWrittenBytes() {
413         return cumulativeWrittenBytes.get();
414     }
415 
416     
417 
418 
419     public long cumulativeReadBytes() {
420         return cumulativeReadBytes.get();
421     }
422 
423     
424 
425 
426 
427     public long lastCumulativeTime() {
428         return lastCumulativeTime;
429     }
430 
431     
432 
433 
434     public AtomicLong getRealWrittenBytes() {
435         return realWrittenBytes;
436     }
437 
438     
439 
440 
441     public long getRealWriteThroughput() {
442         return realWriteThroughput;
443     }
444 
445     
446 
447 
448 
449     public void resetCumulativeTime() {
450         lastCumulativeTime = System.currentTimeMillis();
451         cumulativeReadBytes.set(0);
452         cumulativeWrittenBytes.set(0);
453     }
454 
455     
456 
457 
458     public String name() {
459         return name;
460     }
461 
462     
463 
464 
465 
466 
467 
468 
469 
470 
471 
472 
473 
474     @Deprecated
475     public long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
476         return readTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
477     }
478 
479     
480 
481 
482 
483 
484 
485 
486 
487 
488 
489 
490 
491 
492     public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
493         bytesRecvFlowControl(size);
494         if (size == 0 || limitTraffic == 0) {
495             return 0;
496         }
497         final long lastTimeCheck = lastTime.get();
498         long sum = currentReadBytes.get();
499         long localReadingTime = readingTime;
500         long lastRB = lastReadBytes;
501         final long interval = now - lastTimeCheck;
502         long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
503         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
504             
505             long time = sum * 1000 / limitTraffic - interval + pastDelay;
506             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
507                 if (logger.isDebugEnabled()) {
508                     logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
509                 }
510                 if (time > maxTime && now + time - localReadingTime > maxTime) {
511                     time = maxTime;
512                 }
513                 readingTime = Math.max(localReadingTime, now + time);
514                 return time;
515             }
516             readingTime = Math.max(localReadingTime, now);
517             return 0;
518         }
519         
520         long lastsum = sum + lastRB;
521         long lastinterval = interval + checkInterval.get();
522         long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
523         if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
524             if (logger.isDebugEnabled()) {
525                 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
526             }
527             if (time > maxTime && now + time - localReadingTime > maxTime) {
528                 time = maxTime;
529             }
530             readingTime = Math.max(localReadingTime, now + time);
531             return time;
532         }
533         readingTime = Math.max(localReadingTime, now);
534         return 0;
535     }
536 
537     
538 
539 
540 
541 
542 
543 
544 
545 
546 
547 
548 
549     @Deprecated
550     public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
551         return writeTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
552     }
553 
554     
555 
556 
557 
558 
559 
560 
561 
562 
563 
564 
565 
566 
567     public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
568         bytesWriteFlowControl(size);
569         if (size == 0 || limitTraffic == 0) {
570             return 0;
571         }
572         final long lastTimeCheck = lastTime.get();
573         long sum = currentWrittenBytes.get();
574         long lastWB = lastWrittenBytes;
575         long localWritingTime = writingTime;
576         long pastDelay = Math.max(lastWritingTime - lastTimeCheck, 0);
577         final long interval = now - lastTimeCheck;
578         if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
579             
580             long time = sum * 1000 / limitTraffic - interval + pastDelay;
581             if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
582                 if (logger.isDebugEnabled()) {
583                     logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
584                 }
585                 if (time > maxTime && now + time - localWritingTime > maxTime) {
586                     time = maxTime;
587                 }
588                 writingTime = Math.max(localWritingTime, now + time);
589                 return time;
590             }
591             writingTime = Math.max(localWritingTime, now);
592             return 0;
593         }
594         
595         long lastsum = sum + lastWB;
596         long lastinterval = interval + checkInterval.get();
597         long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
598         if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
599             if (logger.isDebugEnabled()) {
600                 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
601             }
602             if (time > maxTime && now + time - localWritingTime > maxTime) {
603                 time = maxTime;
604             }
605             writingTime = Math.max(localWritingTime, now + time);
606             return time;
607         }
608         writingTime = Math.max(localWritingTime, now);
609         return 0;
610     }
611 
612     @Override
613     public String toString() {
614         return new StringBuilder(165).append("Monitor ").append(name)
615                 .append(" Current Speed Read: ").append(lastReadThroughput >> 10).append(" KB/s, ")
616                 .append("Asked Write: ").append(lastWriteThroughput >> 10).append(" KB/s, ")
617                 .append("Real Write: ").append(realWriteThroughput >> 10).append(" KB/s, ")
618                 .append("Current Read: ").append(currentReadBytes.get() >> 10).append(" KB, ")
619                 .append("Current asked Write: ").append(currentWrittenBytes.get() >> 10).append(" KB, ")
620                 .append("Current real Write: ").append(realWrittenBytes.get() >> 10).append(" KB").toString();
621     }
622 }