1
2
3
4
5
6
7
8
9
10
11
12
13 package io.netty.handler.traffic;
14
15 import io.netty.util.internal.logging.InternalLogger;
16 import io.netty.util.internal.logging.InternalLoggerFactory;
17
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicLong;
22
23
24
25
26
27
28
29
30
31
32 public class TrafficCounter {
33
34 private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
35
36
37
38
39 public static long milliSecondFromNano() {
40 return System.nanoTime() / 1000000;
41 }
42
43
44
45
46 private final AtomicLong currentWrittenBytes = new AtomicLong();
47
48
49
50
51 private final AtomicLong currentReadBytes = new AtomicLong();
52
53
54
55
56 private long writingTime;
57
58
59
60
61 private long readingTime;
62
63
64
65
66 private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
67
68
69
70
71 private final AtomicLong cumulativeReadBytes = new AtomicLong();
72
73
74
75
76 private long lastCumulativeTime;
77
78
79
80
81 private long lastWriteThroughput;
82
83
84
85
86 private long lastReadThroughput;
87
88
89
90
91 final AtomicLong lastTime = new AtomicLong();
92
93
94
95
96 private volatile long lastWrittenBytes;
97
98
99
100
101 private volatile long lastReadBytes;
102
103
104
105
106 private volatile long lastWritingTime;
107
108
109
110
111 private volatile long lastReadingTime;
112
113
114
115
116 private final AtomicLong realWrittenBytes = new AtomicLong();
117
118
119
120
121 private long realWriteThroughput;
122
123
124
125
126 final AtomicLong checkInterval = new AtomicLong(AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
127
128
129
130
131
132
133 final String name;
134
135
136
137
138 final AbstractTrafficShapingHandler trafficShapingHandler;
139
140
141
142
143 final ScheduledExecutorService executor;
144
145
146
147 Runnable monitor;
148
149
150
151 volatile ScheduledFuture<?> scheduledFuture;
152
153
154
155
156 volatile boolean monitorActive;
157
158
159
160
161
162 private final class TrafficMonitoringTask implements Runnable {
163 @Override
164 public void run() {
165 if (!monitorActive) {
166 return;
167 }
168 resetAccounting(milliSecondFromNano());
169 if (trafficShapingHandler != null) {
170 trafficShapingHandler.doAccounting(TrafficCounter.this);
171 }
172 scheduledFuture = executor.schedule(this, checkInterval.get(), TimeUnit.MILLISECONDS);
173 }
174 }
175
176
177
178
179 public synchronized void start() {
180 if (monitorActive) {
181 return;
182 }
183 lastTime.set(milliSecondFromNano());
184 long localCheckInterval = checkInterval.get();
185
186 if (localCheckInterval > 0 && executor != null) {
187 monitorActive = true;
188 monitor = new TrafficMonitoringTask();
189 scheduledFuture =
190 executor.schedule(monitor, localCheckInterval, TimeUnit.MILLISECONDS);
191 }
192 }
193
194
195
196
197 public synchronized void stop() {
198 if (!monitorActive) {
199 return;
200 }
201 monitorActive = false;
202 resetAccounting(milliSecondFromNano());
203 if (trafficShapingHandler != null) {
204 trafficShapingHandler.doAccounting(this);
205 }
206 if (scheduledFuture != null) {
207 scheduledFuture.cancel(true);
208 }
209 }
210
211
212
213
214
215
216 synchronized void resetAccounting(long newLastTime) {
217 long interval = newLastTime - lastTime.getAndSet(newLastTime);
218 if (interval == 0) {
219
220 return;
221 }
222 if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
223 logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
224 }
225 lastReadBytes = currentReadBytes.getAndSet(0);
226 lastWrittenBytes = currentWrittenBytes.getAndSet(0);
227 lastReadThroughput = lastReadBytes * 1000 / interval;
228
229 lastWriteThroughput = lastWrittenBytes * 1000 / interval;
230
231 realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
232 lastWritingTime = Math.max(lastWritingTime, writingTime);
233 lastReadingTime = Math.max(lastReadingTime, readingTime);
234 }
235
236
237
238
239
240
241
242
243
244
245
246
247
248 public TrafficCounter(ScheduledExecutorService executor, String name, long checkInterval) {
249 if (name == null) {
250 throw new NullPointerException("name");
251 }
252
253 trafficShapingHandler = null;
254 this.executor = executor;
255 this.name = name;
256
257 init(checkInterval);
258 }
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274 public TrafficCounter(
275 AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor,
276 String name, long checkInterval) {
277
278 if (trafficShapingHandler == null) {
279 throw new IllegalArgumentException("trafficShapingHandler");
280 }
281 if (name == null) {
282 throw new NullPointerException("name");
283 }
284
285 this.trafficShapingHandler = trafficShapingHandler;
286 this.executor = executor;
287 this.name = name;
288
289 init(checkInterval);
290 }
291
292 private void init(long checkInterval) {
293
294 lastCumulativeTime = System.currentTimeMillis();
295 writingTime = milliSecondFromNano();
296 readingTime = writingTime;
297 lastWritingTime = writingTime;
298 lastReadingTime = writingTime;
299 configure(checkInterval);
300 }
301
302
303
304
305
306
307 public void configure(long newCheckInterval) {
308 long newInterval = newCheckInterval / 10 * 10;
309 if (checkInterval.getAndSet(newInterval) != newInterval) {
310 if (newInterval <= 0) {
311 stop();
312
313 lastTime.set(milliSecondFromNano());
314 } else {
315
316 start();
317 }
318 }
319 }
320
321
322
323
324
325
326
327 void bytesRecvFlowControl(long recv) {
328 currentReadBytes.addAndGet(recv);
329 cumulativeReadBytes.addAndGet(recv);
330 }
331
332
333
334
335
336
337
338 void bytesWriteFlowControl(long write) {
339 currentWrittenBytes.addAndGet(write);
340 cumulativeWrittenBytes.addAndGet(write);
341 }
342
343
344
345
346
347
348
349 void bytesRealWriteFlowControl(long write) {
350 realWrittenBytes.addAndGet(write);
351 }
352
353
354
355
356
357 public long checkInterval() {
358 return checkInterval.get();
359 }
360
361
362
363
364 public long lastReadThroughput() {
365 return lastReadThroughput;
366 }
367
368
369
370
371 public long lastWriteThroughput() {
372 return lastWriteThroughput;
373 }
374
375
376
377
378 public long lastReadBytes() {
379 return lastReadBytes;
380 }
381
382
383
384
385 public long lastWrittenBytes() {
386 return lastWrittenBytes;
387 }
388
389
390
391
392 public long currentReadBytes() {
393 return currentReadBytes.get();
394 }
395
396
397
398
399 public long currentWrittenBytes() {
400 return currentWrittenBytes.get();
401 }
402
403
404
405
406 public long lastTime() {
407 return lastTime.get();
408 }
409
410
411
412
413 public long cumulativeWrittenBytes() {
414 return cumulativeWrittenBytes.get();
415 }
416
417
418
419
420 public long cumulativeReadBytes() {
421 return cumulativeReadBytes.get();
422 }
423
424
425
426
427
428 public long lastCumulativeTime() {
429 return lastCumulativeTime;
430 }
431
432
433
434
435 public AtomicLong getRealWrittenBytes() {
436 return realWrittenBytes;
437 }
438
439
440
441
442 public long getRealWriteThroughput() {
443 return realWriteThroughput;
444 }
445
446
447
448
449
450 public void resetCumulativeTime() {
451 lastCumulativeTime = System.currentTimeMillis();
452 cumulativeReadBytes.set(0);
453 cumulativeWrittenBytes.set(0);
454 }
455
456
457
458
459 public String name() {
460 return name;
461 }
462
463
464
465
466
467
468
469
470
471
472
473
474
475 @Deprecated
476 public long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
477 return readTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
478 }
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493 public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
494 bytesRecvFlowControl(size);
495 if (size == 0 || limitTraffic == 0) {
496 return 0;
497 }
498 final long lastTimeCheck = lastTime.get();
499 long sum = currentReadBytes.get();
500 long localReadingTime = readingTime;
501 long lastRB = lastReadBytes;
502 final long interval = now - lastTimeCheck;
503 long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
504 if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
505
506 long time = sum * 1000 / limitTraffic - interval + pastDelay;
507 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
508 if (logger.isDebugEnabled()) {
509 logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
510 }
511 if (time > maxTime && now + time - localReadingTime > maxTime) {
512 time = maxTime;
513 }
514 readingTime = Math.max(localReadingTime, now + time);
515 return time;
516 }
517 readingTime = Math.max(localReadingTime, now);
518 return 0;
519 }
520
521 long lastsum = sum + lastRB;
522 long lastinterval = interval + checkInterval.get();
523 long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
524 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
525 if (logger.isDebugEnabled()) {
526 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
527 }
528 if (time > maxTime && now + time - localReadingTime > maxTime) {
529 time = maxTime;
530 }
531 readingTime = Math.max(localReadingTime, now + time);
532 return time;
533 }
534 readingTime = Math.max(localReadingTime, now);
535 return 0;
536 }
537
538
539
540
541
542
543
544
545
546
547
548
549
550 @Deprecated
551 public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
552 return writeTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
553 }
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568 public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
569 bytesWriteFlowControl(size);
570 if (size == 0 || limitTraffic == 0) {
571 return 0;
572 }
573 final long lastTimeCheck = lastTime.get();
574 long sum = currentWrittenBytes.get();
575 long lastWB = lastWrittenBytes;
576 long localWritingTime = writingTime;
577 long pastDelay = Math.max(lastWritingTime - lastTimeCheck, 0);
578 final long interval = now - lastTimeCheck;
579 if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
580
581 long time = sum * 1000 / limitTraffic - interval + pastDelay;
582 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
583 if (logger.isDebugEnabled()) {
584 logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
585 }
586 if (time > maxTime && now + time - localWritingTime > maxTime) {
587 time = maxTime;
588 }
589 writingTime = Math.max(localWritingTime, now + time);
590 return time;
591 }
592 writingTime = Math.max(localWritingTime, now);
593 return 0;
594 }
595
596 long lastsum = sum + lastWB;
597 long lastinterval = interval + checkInterval.get();
598 long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
599 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
600 if (logger.isDebugEnabled()) {
601 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
602 }
603 if (time > maxTime && now + time - localWritingTime > maxTime) {
604 time = maxTime;
605 }
606 writingTime = Math.max(localWritingTime, now + time);
607 return time;
608 }
609 writingTime = Math.max(localWritingTime, now);
610 return 0;
611 }
612
613 @Override
614 public String toString() {
615 return new StringBuilder(165).append("Monitor ").append(name)
616 .append(" Current Speed Read: ").append(lastReadThroughput >> 10).append(" KB/s, ")
617 .append("Asked Write: ").append(lastWriteThroughput >> 10).append(" KB/s, ")
618 .append("Real Write: ").append(realWriteThroughput >> 10).append(" KB/s, ")
619 .append("Current Read: ").append(currentReadBytes.get() >> 10).append(" KB, ")
620 .append("Current asked Write: ").append(currentWrittenBytes.get() >> 10).append(" KB, ")
621 .append("Current real Write: ").append(realWrittenBytes.get() >> 10).append(" KB").toString();
622 }
623 }