1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicBoolean;
20 import java.util.concurrent.atomic.AtomicLong;
21
22 import org.jboss.netty.channel.ChannelHandlerContext;
23 import org.jboss.netty.util.Timeout;
24 import org.jboss.netty.util.Timer;
25 import org.jboss.netty.util.TimerTask;
26
27
28
29
30
31
32
33
34
35
36 public class TrafficCounter {
37
38
39
40 private final AtomicLong currentWrittenBytes = new AtomicLong();
41
42
43
44
45 private final AtomicLong currentReadBytes = new AtomicLong();
46
47
48
49
50 private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
51
52
53
54
55 private final AtomicLong cumulativeReadBytes = new AtomicLong();
56
57
58
59
60 private long lastCumulativeTime;
61
62
63
64
65 private long lastWriteThroughput;
66
67
68
69
70 private long lastReadThroughput;
71
72
73
74
75 private final AtomicLong lastTime = new AtomicLong();
76
77
78
79
80 private long lastWrittenBytes;
81
82
83
84
85 private long lastReadBytes;
86
87
88
89
90 AtomicLong checkInterval = new AtomicLong(
91 AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
92
93
94
95
96
97
98 final String name;
99
100
101
102
103 private final AbstractTrafficShapingHandler trafficShapingHandler;
104
105
106
107
108 private final Timer timer;
109
110
111
112 private TimerTask timerTask;
113
114
115
116 private volatile Timeout timeout;
117
118
119
120
121 AtomicBoolean monitorActive = new AtomicBoolean();
122
123
124
125
126
127 private static class TrafficMonitoringTask implements TimerTask {
128
129
130
131 private final AbstractTrafficShapingHandler trafficShapingHandler1;
132
133
134
135
136 private final TrafficCounter counter;
137
138
139
140
141
142 protected TrafficMonitoringTask(
143 AbstractTrafficShapingHandler trafficShapingHandler,
144 TrafficCounter counter) {
145 trafficShapingHandler1 = trafficShapingHandler;
146 this.counter = counter;
147 }
148
149 public void run(Timeout timeout) throws Exception {
150 if (!counter.monitorActive.get()) {
151 return;
152 }
153 long endTime = System.currentTimeMillis();
154 counter.resetAccounting(endTime);
155 if (trafficShapingHandler1 != null) {
156 trafficShapingHandler1.doAccounting(counter);
157 }
158 timeout =
159 counter.timer.newTimeout(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS);
160 }
161 }
162
163
164
165
166 public void start() {
167 synchronized (lastTime) {
168 if (monitorActive.get()) {
169 return;
170 }
171 lastTime.set(System.currentTimeMillis());
172 if (checkInterval.get() > 0) {
173 monitorActive.set(true);
174 timerTask = new TrafficMonitoringTask(trafficShapingHandler, this);
175 timeout =
176 timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
177 }
178 }
179 }
180
181
182
183
184 public void stop() {
185 synchronized (lastTime) {
186 if (!monitorActive.get()) {
187 return;
188 }
189 monitorActive.set(false);
190 resetAccounting(System.currentTimeMillis());
191 if (trafficShapingHandler != null) {
192 trafficShapingHandler.doAccounting(this);
193 }
194 if (timeout != null) {
195 timeout.cancel();
196 }
197 }
198 }
199
200
201
202
203
204
205 void resetAccounting(long newLastTime) {
206 synchronized (lastTime) {
207 long interval = newLastTime - lastTime.getAndSet(newLastTime);
208 if (interval == 0) {
209
210 return;
211 }
212 lastReadBytes = currentReadBytes.getAndSet(0);
213 lastWrittenBytes = currentWrittenBytes.getAndSet(0);
214 lastReadThroughput = lastReadBytes / interval * 1000;
215
216 lastWriteThroughput = lastWrittenBytes / interval * 1000;
217
218 }
219 }
220
221
222
223
224
225
226
227
228
229
230
231
232 public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
233 Timer timer, String name, long checkInterval) {
234 this.trafficShapingHandler = trafficShapingHandler;
235 this.timer = timer;
236 this.name = name;
237 lastCumulativeTime = System.currentTimeMillis();
238 configure(checkInterval);
239 }
240
241
242
243
244
245
246
247 public void configure(long newcheckInterval) {
248 long newInterval = newcheckInterval / 10 * 10;
249 if (checkInterval.get() != newInterval) {
250 checkInterval.set(newInterval);
251 if (newInterval <= 0) {
252 stop();
253
254 lastTime.set(System.currentTimeMillis());
255 } else {
256
257 start();
258 }
259 }
260 }
261
262
263
264
265
266
267
268
269
270 void bytesRecvFlowControl(ChannelHandlerContext ctx, long recv) {
271 currentReadBytes.addAndGet(recv);
272 cumulativeReadBytes.addAndGet(recv);
273 }
274
275
276
277
278
279
280
281 void bytesWriteFlowControl(long write) {
282 currentWrittenBytes.addAndGet(write);
283 cumulativeWrittenBytes.addAndGet(write);
284 }
285
286
287
288
289
290
291 public long getCheckInterval() {
292 return checkInterval.get();
293 }
294
295
296
297
298
299 public long getLastReadThroughput() {
300 return lastReadThroughput;
301 }
302
303
304
305
306
307 public long getLastWriteThroughput() {
308 return lastWriteThroughput;
309 }
310
311
312
313
314
315 public long getLastReadBytes() {
316 return lastReadBytes;
317 }
318
319
320
321
322
323 public long getLastWrittenBytes() {
324 return lastWrittenBytes;
325 }
326
327
328
329
330
331 public long getCurrentReadBytes() {
332 return currentReadBytes.get();
333 }
334
335
336
337
338
339 public long getCurrentWrittenBytes() {
340 return currentWrittenBytes.get();
341 }
342
343
344
345
346 public long getLastTime() {
347 return lastTime.get();
348 }
349
350
351
352
353 public long getCumulativeWrittenBytes() {
354 return cumulativeWrittenBytes.get();
355 }
356
357
358
359
360 public long getCumulativeReadBytes() {
361 return cumulativeReadBytes.get();
362 }
363
364
365
366
367
368 public long getLastCumulativeTime() {
369 return lastCumulativeTime;
370 }
371
372
373
374
375 public void resetCumulativeTime() {
376 lastCumulativeTime = System.currentTimeMillis();
377 cumulativeReadBytes.set(0);
378 cumulativeWrittenBytes.set(0);
379 }
380
381
382
383
384 public String getName() {
385 return name;
386 }
387
388
389
390
391 @Override
392 public String toString() {
393 return "Monitor " + name + " Current Speed Read: " +
394 (lastReadThroughput >> 10) + " KB/s, Write: " +
395 (lastWriteThroughput >> 10) + " KB/s Current Read: " +
396 (currentReadBytes.get() >> 10) + " KB Current Write: " +
397 (currentWrittenBytes.get() >> 10) + " KB";
398 }
399 }