1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import org.jboss.netty.util.Timeout;
19 import org.jboss.netty.util.Timer;
20 import org.jboss.netty.util.TimerTask;
21
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicLong;
25
26
27
28
29
30
31
32
33
34
35 public class TrafficCounter {
36
37
38
39 private final AtomicLong currentWrittenBytes = new AtomicLong();
40
41
42
43
44 private final AtomicLong currentReadBytes = new AtomicLong();
45
46
47
48
49 private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
50
51
52
53
54 private final AtomicLong cumulativeReadBytes = new AtomicLong();
55
56
57
58
59 private long lastCumulativeTime;
60
61
62
63
64 private long lastWriteThroughput;
65
66
67
68
69 private long lastReadThroughput;
70
71
72
73
74 private final AtomicLong lastTime = new AtomicLong();
75
76
77
78
79 private long lastWrittenBytes;
80
81
82
83
84 private long lastReadBytes;
85
86
87
88
89 final AtomicLong checkInterval = new AtomicLong(
90 AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
91
92
93
94
95
96
97 final String name;
98
99
100
101
102 private final AbstractTrafficShapingHandler trafficShapingHandler;
103
104
105
106
107 private final Timer timer;
108
109
110
111 private TimerTask timerTask;
112
113
114
115 private volatile Timeout timeout;
116
117
118
119
120 final AtomicBoolean monitorActive = new AtomicBoolean();
121
122
123
124
125
126 private static class TrafficMonitoringTask implements TimerTask {
127
128
129
130 private final AbstractTrafficShapingHandler trafficShapingHandler1;
131
132
133
134
135 private final TrafficCounter counter;
136
137 protected TrafficMonitoringTask(
138 AbstractTrafficShapingHandler trafficShapingHandler,
139 TrafficCounter counter) {
140 trafficShapingHandler1 = trafficShapingHandler;
141 this.counter = counter;
142 }
143
144 public void run(Timeout timeout) throws Exception {
145 if (!counter.monitorActive.get()) {
146 return;
147 }
148 long endTime = System.currentTimeMillis();
149 counter.resetAccounting(endTime);
150 if (trafficShapingHandler1 != null) {
151 trafficShapingHandler1.doAccounting(counter);
152 }
153
154 counter.timer.newTimeout(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS);
155 }
156 }
157
158
159
160
161 public void start() {
162 synchronized (lastTime) {
163 if (monitorActive.get()) {
164 return;
165 }
166 lastTime.set(System.currentTimeMillis());
167 if (checkInterval.get() > 0) {
168 monitorActive.set(true);
169 timerTask = new TrafficMonitoringTask(trafficShapingHandler, this);
170 timeout =
171 timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
172 }
173 }
174 }
175
176
177
178
179 public void stop() {
180 synchronized (lastTime) {
181 if (!monitorActive.get()) {
182 return;
183 }
184 monitorActive.set(false);
185 resetAccounting(System.currentTimeMillis());
186 if (trafficShapingHandler != null) {
187 trafficShapingHandler.doAccounting(this);
188 }
189 if (timeout != null) {
190 timeout.cancel();
191 }
192 }
193 }
194
195
196
197
198 void resetAccounting(long newLastTime) {
199 synchronized (lastTime) {
200 long interval = newLastTime - lastTime.getAndSet(newLastTime);
201 if (interval == 0) {
202
203 return;
204 }
205 lastReadBytes = currentReadBytes.getAndSet(0);
206 lastWrittenBytes = currentWrittenBytes.getAndSet(0);
207 lastReadThroughput = lastReadBytes / interval * 1000;
208
209 lastWriteThroughput = lastWrittenBytes / interval * 1000;
210
211 }
212 }
213
214
215
216
217
218
219
220
221
222
223
224
225 public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
226 Timer timer, String name, long checkInterval) {
227 this.trafficShapingHandler = trafficShapingHandler;
228 this.timer = timer;
229 this.name = name;
230 lastCumulativeTime = System.currentTimeMillis();
231 configure(checkInterval);
232 }
233
234
235
236
237
238 public void configure(long newcheckInterval) {
239 long newInterval = newcheckInterval / 10 * 10;
240 if (checkInterval.get() != newInterval) {
241 checkInterval.set(newInterval);
242 if (newInterval <= 0) {
243 stop();
244
245 lastTime.set(System.currentTimeMillis());
246 } else {
247
248 start();
249 }
250 }
251 }
252
253
254
255
256
257
258
259 void bytesRecvFlowControl(long recv) {
260 currentReadBytes.addAndGet(recv);
261 cumulativeReadBytes.addAndGet(recv);
262 }
263
264
265
266
267
268
269
270 void bytesWriteFlowControl(long write) {
271 currentWrittenBytes.addAndGet(write);
272 cumulativeWrittenBytes.addAndGet(write);
273 }
274
275
276
277
278
279
280 public long getCheckInterval() {
281 return checkInterval.get();
282 }
283
284
285
286
287
288 public long getLastReadThroughput() {
289 return lastReadThroughput;
290 }
291
292
293
294
295
296 public long getLastWriteThroughput() {
297 return lastWriteThroughput;
298 }
299
300
301
302
303
304 public long getLastReadBytes() {
305 return lastReadBytes;
306 }
307
308
309
310
311
312 public long getLastWrittenBytes() {
313 return lastWrittenBytes;
314 }
315
316
317
318
319
320 public long getCurrentReadBytes() {
321 return currentReadBytes.get();
322 }
323
324
325
326
327
328 public long getCurrentWrittenBytes() {
329 return currentWrittenBytes.get();
330 }
331
332
333
334
335 public long getLastTime() {
336 return lastTime.get();
337 }
338
339
340
341
342 public long getCumulativeWrittenBytes() {
343 return cumulativeWrittenBytes.get();
344 }
345
346
347
348
349 public long getCumulativeReadBytes() {
350 return cumulativeReadBytes.get();
351 }
352
353
354
355
356
357 public long getLastCumulativeTime() {
358 return lastCumulativeTime;
359 }
360
361
362
363
364 public void resetCumulativeTime() {
365 lastCumulativeTime = System.currentTimeMillis();
366 cumulativeReadBytes.set(0);
367 cumulativeWrittenBytes.set(0);
368 }
369
370
371
372
373 public String getName() {
374 return name;
375 }
376
377
378
379
380 @Override
381 public String toString() {
382 return "Monitor " + name + " Current Speed Read: " +
383 (lastReadThroughput >> 10) + " KB/s, Write: " +
384 (lastWriteThroughput >> 10) + " KB/s Current Read: " +
385 (currentReadBytes.get() >> 10) + " KB Current Write: " +
386 (currentWrittenBytes.get() >> 10) + " KB";
387 }
388 }