1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import org.jboss.netty.logging.InternalLogger;
19 import org.jboss.netty.logging.InternalLoggerFactory;
20 import org.jboss.netty.util.Timeout;
21 import org.jboss.netty.util.Timer;
22 import org.jboss.netty.util.TimerTask;
23
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicLong;
27
28
29
30
31
32
33
34
35
36
37 public class TrafficCounter {
38 private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
39
40
41
42
43 private final AtomicLong currentWrittenBytes = new AtomicLong();
44
45
46
47
48 private final AtomicLong currentReadBytes = new AtomicLong();
49
50
51
52
53 private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
54
55
56
57
58 private final AtomicLong cumulativeReadBytes = new AtomicLong();
59
60
61
62
63 private long lastCumulativeTime;
64
65
66
67
68 private long lastWriteThroughput;
69
70
71
72
73 private long lastReadThroughput;
74
75
76
77
78 private final AtomicLong lastTime = new AtomicLong();
79
80
81
82
83 private long lastWrittenBytes;
84
85
86
87
88 private long lastReadBytes;
89
90
91
92
93 private long lastNonNullWrittenBytes;
94
95
96
97
98 private long lastNonNullWrittenTime;
99
100
101
102
103 private long lastNonNullReadTime;
104
105
106
107
108 private long lastNonNullReadBytes;
109
110
111
112
113 final AtomicLong checkInterval = new AtomicLong(
114 AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
115
116
117
118
119
120
121 final String name;
122
123
124
125
126 private final AbstractTrafficShapingHandler trafficShapingHandler;
127
128
129
130
131 private final Timer timer;
132
133
134
135 private TimerTask timerTask;
136
137
138
139 private volatile Timeout timeout;
140
141
142
143
144 final AtomicBoolean monitorActive = new AtomicBoolean();
145
146
147
148
149
150 private static class TrafficMonitoringTask implements TimerTask {
151
152
153
154 private final AbstractTrafficShapingHandler trafficShapingHandler1;
155
156
157
158
159 private final TrafficCounter counter;
160
161 protected TrafficMonitoringTask(
162 AbstractTrafficShapingHandler trafficShapingHandler,
163 TrafficCounter counter) {
164 trafficShapingHandler1 = trafficShapingHandler;
165 this.counter = counter;
166 }
167
168 public void run(Timeout timeout) throws Exception {
169 if (!counter.monitorActive.get()) {
170 return;
171 }
172 long endTime = System.currentTimeMillis();
173 counter.resetAccounting(endTime);
174 if (trafficShapingHandler1 != null) {
175 trafficShapingHandler1.doAccounting(counter);
176 }
177
178 counter.timer.newTimeout(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS);
179 }
180 }
181
182
183
184
185 public void start() {
186 synchronized (lastTime) {
187 if (monitorActive.get()) {
188 return;
189 }
190 lastTime.set(System.currentTimeMillis());
191 if (checkInterval.get() > 0) {
192 monitorActive.set(true);
193 timerTask = new TrafficMonitoringTask(trafficShapingHandler, this);
194 timeout =
195 timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
196 }
197 }
198 }
199
200
201
202
203 public void stop() {
204 synchronized (lastTime) {
205 if (!monitorActive.get()) {
206 return;
207 }
208 monitorActive.set(false);
209 resetAccounting(System.currentTimeMillis());
210 if (trafficShapingHandler != null) {
211 trafficShapingHandler.doAccounting(this);
212 }
213 if (timeout != null) {
214 timeout.cancel();
215 }
216 }
217 }
218
219
220
221
222 void resetAccounting(long newLastTime) {
223 synchronized (lastTime) {
224 long interval = newLastTime - lastTime.getAndSet(newLastTime);
225 if (interval == 0) {
226
227 return;
228 }
229 lastReadBytes = currentReadBytes.getAndSet(0);
230 lastWrittenBytes = currentWrittenBytes.getAndSet(0);
231 lastReadThroughput = lastReadBytes * 1000 / interval;
232
233 lastWriteThroughput = lastWrittenBytes * 1000 / interval;
234
235 }
236 if (lastWrittenBytes > 0) {
237 lastNonNullWrittenBytes = lastWrittenBytes;
238 lastNonNullWrittenTime = newLastTime;
239 }
240 if (lastReadBytes > 0) {
241 lastNonNullReadBytes = lastReadBytes;
242 lastNonNullReadTime = newLastTime;
243 }
244 }
245
246
247
248
249
250
251
252
253
254
255
256
257 public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
258 Timer timer, String name, long checkInterval) {
259 this.trafficShapingHandler = trafficShapingHandler;
260 this.timer = timer;
261 this.name = name;
262 lastCumulativeTime = System.currentTimeMillis();
263 configure(checkInterval);
264 }
265
266
267
268
269
270 public void configure(long newcheckInterval) {
271 long newInterval = newcheckInterval / 10 * 10;
272 if (checkInterval.get() != newInterval) {
273 checkInterval.set(newInterval);
274 if (newInterval <= 0) {
275 stop();
276
277 lastTime.set(System.currentTimeMillis());
278 } else {
279
280 start();
281 }
282 }
283 }
284
285
286
287
288
289
290
291 void bytesRecvFlowControl(long recv) {
292 currentReadBytes.addAndGet(recv);
293 cumulativeReadBytes.addAndGet(recv);
294 }
295
296
297
298
299
300
301
302 void bytesWriteFlowControl(long write) {
303 currentWrittenBytes.addAndGet(write);
304 cumulativeWrittenBytes.addAndGet(write);
305 }
306
307
308
309
310
311
312 public long getCheckInterval() {
313 return checkInterval.get();
314 }
315
316
317
318
319
320 public long getLastReadThroughput() {
321 return lastReadThroughput;
322 }
323
324
325
326
327
328 public long getLastWriteThroughput() {
329 return lastWriteThroughput;
330 }
331
332
333
334
335
336 public long getLastReadBytes() {
337 return lastReadBytes;
338 }
339
340
341
342
343
344 public long getLastWrittenBytes() {
345 return lastWrittenBytes;
346 }
347
348
349
350
351
352 public long getCurrentReadBytes() {
353 return currentReadBytes.get();
354 }
355
356
357
358
359
360 public long getCurrentWrittenBytes() {
361 return currentWrittenBytes.get();
362 }
363
364
365
366
367 public long getLastTime() {
368 return lastTime.get();
369 }
370
371
372
373
374 public long getCumulativeWrittenBytes() {
375 return cumulativeWrittenBytes.get();
376 }
377
378
379
380
381 public long getCumulativeReadBytes() {
382 return cumulativeReadBytes.get();
383 }
384
385
386
387
388
389 public long getLastCumulativeTime() {
390 return lastCumulativeTime;
391 }
392
393
394
395
396 public void resetCumulativeTime() {
397 lastCumulativeTime = System.currentTimeMillis();
398 cumulativeReadBytes.set(0);
399 cumulativeWrittenBytes.set(0);
400 }
401
402
403
404
405
406
407
408
409
410
411
412
413
414 public synchronized long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
415 final long now = System.currentTimeMillis();
416 bytesRecvFlowControl(size);
417 if (limitTraffic == 0) {
418 return 0;
419 }
420 long sum = currentReadBytes.get();
421 long interval = now - lastTime.get();
422
423 if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) {
424 long time = (sum * 1000 / limitTraffic - interval) / 10 * 10;
425 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
426 if (logger.isDebugEnabled()) {
427 logger.debug("Time: " + time + ":" + sum + ":" + interval);
428 }
429 return time > maxTime ? maxTime : time;
430 }
431 return 0;
432 }
433
434 if (lastNonNullReadBytes > 0 && lastNonNullReadTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) {
435 long lastsum = sum + lastNonNullReadBytes;
436 long lastinterval = now - lastNonNullReadTime;
437 long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10;
438 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
439 if (logger.isDebugEnabled()) {
440 logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval);
441 }
442 return time > maxTime ? maxTime : time;
443 }
444 } else {
445
446 sum += lastReadBytes;
447 long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT;
448 long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10;
449 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
450 if (logger.isDebugEnabled()) {
451 logger.debug("Time: " + time + ":" + sum + ":" + lastinterval);
452 }
453 return time > maxTime ? maxTime : time;
454 }
455 }
456 return 0;
457 }
458
459
460
461
462
463
464
465
466
467
468
469
470
471 public synchronized long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
472 bytesWriteFlowControl(size);
473 if (limitTraffic == 0) {
474 return 0;
475 }
476 long sum = currentWrittenBytes.get();
477 final long now = System.currentTimeMillis();
478 long interval = now - lastTime.get();
479 if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) {
480 long time = (sum * 1000 / limitTraffic - interval) / 10 * 10;
481 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
482 if (logger.isDebugEnabled()) {
483 logger.debug("Time: " + time + ":" + sum + ":" + interval);
484 }
485 return time > maxTime ? maxTime : time;
486 }
487 return 0;
488 }
489 if (lastNonNullWrittenBytes > 0 && lastNonNullWrittenTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) {
490 long lastsum = sum + lastNonNullWrittenBytes;
491 long lastinterval = now - lastNonNullWrittenTime;
492 long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10;
493 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
494 if (logger.isDebugEnabled()) {
495 logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval);
496 }
497 return time > maxTime ? maxTime : time;
498 }
499 } else {
500 sum += lastWrittenBytes;
501 long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT + Math.abs(interval);
502 long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10;
503 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
504 if (logger.isDebugEnabled()) {
505 logger.debug("Time: " + time + ":" + sum + ":" + lastinterval);
506 }
507 return time > maxTime ? maxTime : time;
508 }
509 }
510 return 0;
511 }
512
513
514
515
516 public String getName() {
517 return name;
518 }
519
520
521
522
523 @Override
524 public String toString() {
525 return "Monitor " + name + " Current Speed Read: " +
526 (lastReadThroughput >> 10) + " KB/s, Write: " +
527 (lastWriteThroughput >> 10) + " KB/s Current Read: " +
528 (currentReadBytes.get() >> 10) + " KB Current Write: " +
529 (currentWrittenBytes.get() >> 10) + " KB";
530 }
531 }