1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.traffic;
17
18 import io.netty5.util.concurrent.EventExecutor;
19 import io.netty5.util.concurrent.EventExecutorGroup;
20 import io.netty5.util.concurrent.Future;
21 import io.netty5.util.internal.logging.InternalLogger;
22 import io.netty5.util.internal.logging.InternalLoggerFactory;
23
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicLong;
27
28 import static io.netty5.util.internal.ObjectUtil.checkNotNullWithIAE;
29 import static java.util.Objects.requireNonNull;
30
31
32
33
34
35
36
37
38
39
40
41 public class TrafficCounter {
42
43 private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
44
45
46
47
48 public static long milliSecondFromNano() {
49 return System.nanoTime() / 1000000;
50 }
51
52
53
54
55 private final AtomicLong currentWrittenBytes = new AtomicLong();
56
57
58
59
60 private final AtomicLong currentReadBytes = new AtomicLong();
61
62
63
64
65 private long writingTime;
66
67
68
69
70 private long readingTime;
71
72
73
74
75 private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
76
77
78
79
80 private final AtomicLong cumulativeReadBytes = new AtomicLong();
81
82
83
84
85 private long lastCumulativeTime;
86
87
88
89
90 private long lastWriteThroughput;
91
92
93
94
95 private long lastReadThroughput;
96
97
98
99
100 final AtomicLong lastTime = new AtomicLong();
101
102
103
104
105 private volatile long lastWrittenBytes;
106
107
108
109
110 private volatile long lastReadBytes;
111
112
113
114
115 private volatile long lastWritingTime;
116
117
118
119
120 private volatile long lastReadingTime;
121
122
123
124
125 private final AtomicLong realWrittenBytes = new AtomicLong();
126
127
128
129
130 private long realWriteThroughput;
131
132
133
134
135 final AtomicLong checkInterval = new AtomicLong(
136 AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
137
138
139
140
141
142
143 final String name;
144
145
146
147
148 final AbstractTrafficShapingHandler trafficShapingHandler;
149
150
151
152
153 final EventExecutorGroup executor;
154
155
156
157 Runnable monitor;
158
159
160
161 volatile Future<?> scheduledFuture;
162
163
164
165
166 volatile boolean monitorActive;
167
168
169
170
171
172 private final class TrafficMonitoringTask implements Runnable {
173 @Override
174 public void run() {
175 if (!monitorActive) {
176 return;
177 }
178 resetAccounting(milliSecondFromNano());
179 if (trafficShapingHandler != null) {
180 trafficShapingHandler.doAccounting(TrafficCounter.this);
181 }
182 }
183 }
184
185
186
187
188 public synchronized void start() {
189 if (monitorActive) {
190 return;
191 }
192 lastTime.set(milliSecondFromNano());
193 long localCheckInterval = checkInterval.get();
194
195 if (localCheckInterval > 0 && executor != null) {
196 monitorActive = true;
197 monitor = new TrafficMonitoringTask();
198 scheduledFuture =
199 executor.scheduleAtFixedRate(monitor, 0, localCheckInterval, TimeUnit.MILLISECONDS);
200 }
201 }
202
203
204
205
206 public synchronized void stop() {
207 if (!monitorActive) {
208 return;
209 }
210 monitorActive = false;
211 resetAccounting(milliSecondFromNano());
212 if (trafficShapingHandler != null) {
213 trafficShapingHandler.doAccounting(this);
214 }
215 if (scheduledFuture != null) {
216 scheduledFuture.cancel();
217 }
218 }
219
220
221
222
223
224
225 synchronized void resetAccounting(long newLastTime) {
226 long interval = newLastTime - lastTime.getAndSet(newLastTime);
227 if (interval == 0) {
228
229 return;
230 }
231 if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
232 logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
233 }
234 lastReadBytes = currentReadBytes.getAndSet(0);
235 lastWrittenBytes = currentWrittenBytes.getAndSet(0);
236 lastReadThroughput = lastReadBytes * 1000 / interval;
237
238 lastWriteThroughput = lastWrittenBytes * 1000 / interval;
239
240 realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
241 lastWritingTime = Math.max(lastWritingTime, writingTime);
242 lastReadingTime = Math.max(lastReadingTime, readingTime);
243 }
244
245
246
247
248
249
250
251
252
253
254
255
256
257 public TrafficCounter(EventExecutor executor, String name, long checkInterval) {
258 requireNonNull(name, "name");
259
260 trafficShapingHandler = null;
261 this.executor = executor;
262 this.name = name;
263
264 init(checkInterval);
265 }
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281 public TrafficCounter(
282 AbstractTrafficShapingHandler trafficShapingHandler, EventExecutorGroup executor,
283 String name, long checkInterval) {
284 this.name = requireNonNull(name, "name");
285 this.trafficShapingHandler = checkNotNullWithIAE(trafficShapingHandler, "trafficShapingHandler");
286 this.executor = executor;
287
288 init(checkInterval);
289 }
290
291 private void init(long checkInterval) {
292
293 lastCumulativeTime = System.currentTimeMillis();
294 writingTime = milliSecondFromNano();
295 readingTime = writingTime;
296 lastWritingTime = writingTime;
297 lastReadingTime = writingTime;
298 configure(checkInterval);
299 }
300
301
302
303
304
305
306 public void configure(long newCheckInterval) {
307 long newInterval = newCheckInterval / 10 * 10;
308 if (checkInterval.getAndSet(newInterval) != newInterval) {
309 stop();
310 if (newInterval <= 0) {
311
312 lastTime.set(milliSecondFromNano());
313 } else {
314
315 start();
316 }
317 }
318 }
319
320
321
322
323
324
325
326 void bytesRecvFlowControl(long recv) {
327 currentReadBytes.addAndGet(recv);
328 cumulativeReadBytes.addAndGet(recv);
329 }
330
331
332
333
334
335
336
337 void bytesWriteFlowControl(long write) {
338 currentWrittenBytes.addAndGet(write);
339 cumulativeWrittenBytes.addAndGet(write);
340 }
341
342
343
344
345
346
347
348 void bytesRealWriteFlowControl(long write) {
349 realWrittenBytes.addAndGet(write);
350 }
351
352
353
354
355
356 public long checkInterval() {
357 return checkInterval.get();
358 }
359
360
361
362
363 public long lastReadThroughput() {
364 return lastReadThroughput;
365 }
366
367
368
369
370 public long lastWriteThroughput() {
371 return lastWriteThroughput;
372 }
373
374
375
376
377 public long lastReadBytes() {
378 return lastReadBytes;
379 }
380
381
382
383
384 public long lastWrittenBytes() {
385 return lastWrittenBytes;
386 }
387
388
389
390
391 public long currentReadBytes() {
392 return currentReadBytes.get();
393 }
394
395
396
397
398 public long currentWrittenBytes() {
399 return currentWrittenBytes.get();
400 }
401
402
403
404
405 public long lastTime() {
406 return lastTime.get();
407 }
408
409
410
411
412 public long cumulativeWrittenBytes() {
413 return cumulativeWrittenBytes.get();
414 }
415
416
417
418
419 public long cumulativeReadBytes() {
420 return cumulativeReadBytes.get();
421 }
422
423
424
425
426
427 public long lastCumulativeTime() {
428 return lastCumulativeTime;
429 }
430
431
432
433
434 public AtomicLong getRealWrittenBytes() {
435 return realWrittenBytes;
436 }
437
438
439
440
441 public long getRealWriteThroughput() {
442 return realWriteThroughput;
443 }
444
445
446
447
448
449 public void resetCumulativeTime() {
450 lastCumulativeTime = System.currentTimeMillis();
451 cumulativeReadBytes.set(0);
452 cumulativeWrittenBytes.set(0);
453 }
454
455
456
457
458 public String name() {
459 return name;
460 }
461
462
463
464
465
466
467
468
469
470
471
472
473
474 @Deprecated
475 public long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
476 return readTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
477 }
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492 public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
493 bytesRecvFlowControl(size);
494 if (size == 0 || limitTraffic == 0) {
495 return 0;
496 }
497 final long lastTimeCheck = lastTime.get();
498 long sum = currentReadBytes.get();
499 long localReadingTime = readingTime;
500 long lastRB = lastReadBytes;
501 final long interval = now - lastTimeCheck;
502 long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
503 if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
504
505 long time = sum * 1000 / limitTraffic - interval + pastDelay;
506 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
507 if (logger.isDebugEnabled()) {
508 logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
509 }
510 if (time > maxTime && now + time - localReadingTime > maxTime) {
511 time = maxTime;
512 }
513 readingTime = Math.max(localReadingTime, now + time);
514 return time;
515 }
516 readingTime = Math.max(localReadingTime, now);
517 return 0;
518 }
519
520 long lastsum = sum + lastRB;
521 long lastinterval = interval + checkInterval.get();
522 long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
523 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
524 if (logger.isDebugEnabled()) {
525 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
526 }
527 if (time > maxTime && now + time - localReadingTime > maxTime) {
528 time = maxTime;
529 }
530 readingTime = Math.max(localReadingTime, now + time);
531 return time;
532 }
533 readingTime = Math.max(localReadingTime, now);
534 return 0;
535 }
536
537
538
539
540
541
542
543
544
545
546
547
548
549 @Deprecated
550 public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
551 return writeTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
552 }
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567 public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
568 bytesWriteFlowControl(size);
569 if (size == 0 || limitTraffic == 0) {
570 return 0;
571 }
572 final long lastTimeCheck = lastTime.get();
573 long sum = currentWrittenBytes.get();
574 long lastWB = lastWrittenBytes;
575 long localWritingTime = writingTime;
576 long pastDelay = Math.max(lastWritingTime - lastTimeCheck, 0);
577 final long interval = now - lastTimeCheck;
578 if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
579
580 long time = sum * 1000 / limitTraffic - interval + pastDelay;
581 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
582 if (logger.isDebugEnabled()) {
583 logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
584 }
585 if (time > maxTime && now + time - localWritingTime > maxTime) {
586 time = maxTime;
587 }
588 writingTime = Math.max(localWritingTime, now + time);
589 return time;
590 }
591 writingTime = Math.max(localWritingTime, now);
592 return 0;
593 }
594
595 long lastsum = sum + lastWB;
596 long lastinterval = interval + checkInterval.get();
597 long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
598 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
599 if (logger.isDebugEnabled()) {
600 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
601 }
602 if (time > maxTime && now + time - localWritingTime > maxTime) {
603 time = maxTime;
604 }
605 writingTime = Math.max(localWritingTime, now + time);
606 return time;
607 }
608 writingTime = Math.max(localWritingTime, now);
609 return 0;
610 }
611
612 @Override
613 public String toString() {
614 return new StringBuilder(165).append("Monitor ").append(name)
615 .append(" Current Speed Read: ").append(lastReadThroughput >> 10).append(" KB/s, ")
616 .append("Asked Write: ").append(lastWriteThroughput >> 10).append(" KB/s, ")
617 .append("Real Write: ").append(realWriteThroughput >> 10).append(" KB/s, ")
618 .append("Current Read: ").append(currentReadBytes.get() >> 10).append(" KB, ")
619 .append("Current asked Write: ").append(currentWrittenBytes.get() >> 10).append(" KB, ")
620 .append("Current real Write: ").append(realWrittenBytes.get() >> 10).append(" KB").toString();
621 }
622 }