1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import org.jboss.netty.logging.InternalLogger;
19 import org.jboss.netty.logging.InternalLoggerFactory;
20 import org.jboss.netty.util.Timeout;
21 import org.jboss.netty.util.Timer;
22 import org.jboss.netty.util.TimerTask;
23
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicLong;
26
27
28
29
30
31
32
33
34
35
36 public class TrafficCounter {
37 private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
38
39
40
41
42 public static long milliSecondFromNano() {
43 return System.nanoTime() / 1000000;
44 }
45
46
47
48
49 private final AtomicLong currentWrittenBytes = new AtomicLong();
50
51
52
53
54 private final AtomicLong currentReadBytes = new AtomicLong();
55
56
57
58
59 private long writingTime;
60
61
62
63
64 private long readingTime;
65
66
67
68
69 private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
70
71
72
73
74 private final AtomicLong cumulativeReadBytes = new AtomicLong();
75
76
77
78
79 private long lastCumulativeTime;
80
81
82
83
84 private long lastWriteThroughput;
85
86
87
88
89 private long lastReadThroughput;
90
91
92
93
94 final AtomicLong lastTime = new AtomicLong();
95
96
97
98
99 private volatile long lastWrittenBytes;
100
101
102
103
104 private volatile long lastReadBytes;
105
106
107
108
109 private volatile long lastWritingTime;
110
111
112
113
114 private volatile long lastReadingTime;
115
116
117
118
119 private final AtomicLong realWrittenBytes = new AtomicLong();
120
121
122
123
124 private long realWriteThroughput;
125
126
127
128
129 final AtomicLong checkInterval = new AtomicLong(
130 AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
131
132
133
134
135
136
137 final String name;
138
139
140
141
142 final AbstractTrafficShapingHandler trafficShapingHandler;
143
144
145
146
147 final Timer timer;
148
149
150
151
152 TimerTask timerTask;
153
154
155
156
157 volatile Timeout timeout;
158
159
160
161
162 volatile boolean monitorActive;
163
164
165
166
167
168 private static final class TrafficMonitoringTask implements TimerTask {
169
170
171
172 private final AbstractTrafficShapingHandler trafficShapingHandler1;
173
174
175
176
177 private final TrafficCounter counter;
178
179 TrafficMonitoringTask(
180 AbstractTrafficShapingHandler trafficShapingHandler,
181 TrafficCounter counter) {
182 trafficShapingHandler1 = trafficShapingHandler;
183 this.counter = counter;
184 }
185
186 public void run(Timeout timeout) throws Exception {
187 if (!counter.monitorActive) {
188 return;
189 }
190 counter.resetAccounting(milliSecondFromNano());
191 if (trafficShapingHandler1 != null) {
192 trafficShapingHandler1.doAccounting(counter);
193 }
194
195 counter.timer.newTimeout(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS);
196 }
197 }
198
199
200
201
202 public void start() {
203 if (monitorActive) {
204 return;
205 }
206 lastTime.set(milliSecondFromNano());
207
208 if (checkInterval.get() > 0 && timer != null) {
209 monitorActive = true;
210 timerTask = new TrafficMonitoringTask(trafficShapingHandler, this);
211 timeout =
212 timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
213 }
214 }
215
216
217
218
219 public void stop() {
220 if (!monitorActive) {
221 return;
222 }
223 monitorActive = false;
224 resetAccounting(milliSecondFromNano());
225 if (trafficShapingHandler != null) {
226 trafficShapingHandler.doAccounting(this);
227 }
228 if (timeout != null) {
229 timeout.cancel();
230 }
231 }
232
233
234
235
236 void resetAccounting(long newLastTime) {
237 long interval = newLastTime - lastTime.getAndSet(newLastTime);
238 if (interval == 0) {
239
240 return;
241 }
242 lastReadBytes = currentReadBytes.getAndSet(0);
243 lastWrittenBytes = currentWrittenBytes.getAndSet(0);
244 lastReadThroughput = lastReadBytes * 1000 / interval;
245
246 lastWriteThroughput = lastWrittenBytes * 1000 / interval;
247
248 realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
249 lastWritingTime = Math.max(lastWritingTime, writingTime);
250 lastReadingTime = Math.max(lastReadingTime, readingTime);
251 }
252
253
254
255
256
257
258
259
260
261
262
263
264
265 public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
266 Timer timer, String name, long checkInterval) {
267 if (trafficShapingHandler == null) {
268 throw new IllegalArgumentException("TrafficShapingHandler must not be null");
269 }
270 this.trafficShapingHandler = trafficShapingHandler;
271 this.timer = timer;
272 this.name = name;
273
274 lastCumulativeTime = System.currentTimeMillis();
275 writingTime = milliSecondFromNano();
276 readingTime = writingTime;
277 lastWritingTime = writingTime;
278 lastReadingTime = writingTime;
279 configure(checkInterval);
280 }
281
282
283
284
285
286 public void configure(long newcheckInterval) {
287 long newInterval = newcheckInterval / 10 * 10;
288 if (checkInterval.getAndSet(newInterval) != newInterval) {
289 if (newInterval <= 0) {
290 stop();
291
292 lastTime.set(milliSecondFromNano());
293 } else {
294
295 start();
296 }
297 }
298 }
299
300
301
302
303
304
305
306 void bytesRecvFlowControl(long recv) {
307 currentReadBytes.addAndGet(recv);
308 cumulativeReadBytes.addAndGet(recv);
309 }
310
311
312
313
314
315
316
317 void bytesWriteFlowControl(long write) {
318 currentWrittenBytes.addAndGet(write);
319 cumulativeWrittenBytes.addAndGet(write);
320 }
321
322
323
324
325
326
327
328 void bytesRealWriteFlowControl(long write) {
329 realWrittenBytes.addAndGet(write);
330 }
331
332
333
334
335
336 public long getCheckInterval() {
337 return checkInterval.get();
338 }
339
340
341
342
343 public long getLastReadThroughput() {
344 return lastReadThroughput;
345 }
346
347
348
349
350 public long getLastWriteThroughput() {
351 return lastWriteThroughput;
352 }
353
354
355
356
357 public long getLastReadBytes() {
358 return lastReadBytes;
359 }
360
361
362
363
364 public long getLastWrittenBytes() {
365 return lastWrittenBytes;
366 }
367
368
369
370
371 public long getCurrentReadBytes() {
372 return currentReadBytes.get();
373 }
374
375
376
377
378 public long getCurrentWrittenBytes() {
379 return currentWrittenBytes.get();
380 }
381
382
383
384
385 public long getLastTime() {
386 return lastTime.get();
387 }
388
389
390
391
392 public long getCumulativeWrittenBytes() {
393 return cumulativeWrittenBytes.get();
394 }
395
396
397
398
399 public long getCumulativeReadBytes() {
400 return cumulativeReadBytes.get();
401 }
402
403
404
405
406
407 public long getLastCumulativeTime() {
408 return lastCumulativeTime;
409 }
410
411
412
413
414 public AtomicLong getRealWrittenBytes() {
415 return realWrittenBytes;
416 }
417
418
419
420
421 public long getRealWriteThroughput() {
422 return realWriteThroughput;
423 }
424
425
426
427
428
429 public void resetCumulativeTime() {
430 lastCumulativeTime = System.currentTimeMillis();
431 cumulativeReadBytes.set(0);
432 cumulativeWrittenBytes.set(0);
433 }
434
435
436
437
438
439
440
441
442
443
444
445
446
447 @Deprecated
448 public long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
449 return readTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
450 }
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465 public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
466 bytesRecvFlowControl(size);
467 if (size == 0 || limitTraffic == 0) {
468 return 0;
469 }
470 final long lastTimeCheck = lastTime.get();
471 long sum = currentReadBytes.get();
472 long localReadingTime = readingTime;
473 long lastRB = lastReadBytes;
474 final long interval = now - lastTimeCheck;
475 long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
476 if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
477
478 long time = (sum * 1000 / limitTraffic - interval + pastDelay) / 10 * 10;
479 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
480 if (logger.isDebugEnabled()) {
481 logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
482 }
483 if (time > maxTime && now + time - localReadingTime > maxTime) {
484 time = maxTime;
485 }
486 readingTime = Math.max(localReadingTime, now + time);
487 return time;
488 }
489 readingTime = Math.max(localReadingTime, now);
490 return 0;
491 }
492
493 long lastsum = sum + lastRB;
494 long lastinterval = interval + checkInterval.get();
495 long time = (lastsum * 1000 / limitTraffic - lastinterval + pastDelay) / 10 * 10;
496 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
497 if (logger.isDebugEnabled()) {
498 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
499 }
500 if (time > maxTime && now + time - localReadingTime > maxTime) {
501 time = maxTime;
502 }
503 readingTime = Math.max(localReadingTime, now + time);
504 return time;
505 }
506 readingTime = Math.max(localReadingTime, now);
507 return 0;
508 }
509
510
511
512
513
514
515
516
517
518
519
520
521
522 @Deprecated
523 public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
524 return writeTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
525 }
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540 public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
541 bytesWriteFlowControl(size);
542 if (size == 0 || limitTraffic == 0) {
543 return 0;
544 }
545 final long lastTimeCheck = lastTime.get();
546 long sum = currentWrittenBytes.get();
547 long lastWB = lastWrittenBytes;
548 long localWritingTime = writingTime;
549 long pastDelay = Math.max(lastWritingTime - lastTimeCheck, 0);
550 final long interval = now - lastTimeCheck;
551 if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
552
553 long time = (sum * 1000 / limitTraffic - interval + pastDelay) / 10 * 10;
554 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
555 if (logger.isDebugEnabled()) {
556 logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
557 }
558 if (time > maxTime && now + time - localWritingTime > maxTime) {
559 time = maxTime;
560 }
561 writingTime = Math.max(localWritingTime, now + time);
562 return time;
563 }
564 writingTime = Math.max(localWritingTime, now);
565 return 0;
566 }
567
568 long lastsum = sum + lastWB;
569 long lastinterval = interval + checkInterval.get();
570 long time = (lastsum * 1000 / limitTraffic - lastinterval + pastDelay) / 10 * 10;
571 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
572 if (logger.isDebugEnabled()) {
573 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
574 }
575 if (time > maxTime && now + time - localWritingTime > maxTime) {
576 time = maxTime;
577 }
578 writingTime = Math.max(localWritingTime, now + time);
579 return time;
580 }
581 writingTime = Math.max(localWritingTime, now);
582 return 0;
583 }
584
585
586
587
588 public String getName() {
589 return name;
590 }
591
592
593
594
595 @Override
596 public String toString() {
597 return new StringBuilder(165).append("Monitor ").append(name)
598 .append(" Current Speed Read: ").append(lastReadThroughput >> 10).append(" KB/s, ")
599 .append("Asked Write: ").append(lastWriteThroughput >> 10).append(" KB/s, ")
600 .append("Real Write: ").append(realWriteThroughput >> 10).append(" KB/s, ")
601 .append("Current Read: ").append(currentReadBytes.get() >> 10).append(" KB, ")
602 .append("Current asked Write: ").append(currentWrittenBytes.get() >> 10).append(" KB, ")
603 .append("Current real Write: ").append(realWrittenBytes.get() >> 10).append(" KB").toString();
604 }
605 }