1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicBoolean;
20
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.channel.ChannelEvent;
23 import org.jboss.netty.channel.ChannelHandlerContext;
24 import org.jboss.netty.channel.ChannelState;
25 import org.jboss.netty.channel.ChannelStateEvent;
26 import org.jboss.netty.channel.MessageEvent;
27 import org.jboss.netty.channel.SimpleChannelHandler;
28 import org.jboss.netty.logging.InternalLogger;
29 import org.jboss.netty.logging.InternalLoggerFactory;
30 import org.jboss.netty.util.DefaultObjectSizeEstimator;
31 import org.jboss.netty.util.ExternalResourceReleasable;
32 import org.jboss.netty.util.ObjectSizeEstimator;
33 import org.jboss.netty.util.Timeout;
34 import org.jboss.netty.util.Timer;
35 import org.jboss.netty.util.TimerTask;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 public abstract class AbstractTrafficShapingHandler extends
60 SimpleChannelHandler implements ExternalResourceReleasable {
61
62
63
64 static InternalLogger logger = InternalLoggerFactory
65 .getInstance(AbstractTrafficShapingHandler.class);
66
67
68
69
70 public static final long DEFAULT_CHECK_INTERVAL = 1000;
71
72
73
74
75 private static final long MINIMAL_WAIT = 10;
76
77
78
79
80 protected TrafficCounter trafficCounter;
81
82
83
84
85 private ObjectSizeEstimator objectSizeEstimator;
86
87
88
89
90 protected Timer timer;
91
92
93
94
95 private volatile Timeout timeout;
96
97
98
99
100 private long writeLimit;
101
102
103
104
105 private long readLimit;
106
107
108
109
110 protected long checkInterval = DEFAULT_CHECK_INTERVAL;
111
112
113
114
115
116
117 final AtomicBoolean release = new AtomicBoolean(false);
118
119 private void init(ObjectSizeEstimator newObjectSizeEstimator,
120 Timer newTimer, long newWriteLimit, long newReadLimit,
121 long newCheckInterval) {
122 objectSizeEstimator = newObjectSizeEstimator;
123 timer = newTimer;
124 writeLimit = newWriteLimit;
125 readLimit = newReadLimit;
126 checkInterval = newCheckInterval;
127
128 }
129
130
131
132
133
134 void setTrafficCounter(TrafficCounter newTrafficCounter) {
135 trafficCounter = newTrafficCounter;
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
152 long readLimit, long checkInterval) {
153 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval);
154 }
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172 protected AbstractTrafficShapingHandler(
173 ObjectSizeEstimator objectSizeEstimator, Timer timer,
174 long writeLimit, long readLimit, long checkInterval) {
175 init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval);
176 }
177
178
179
180
181
182
183
184
185
186
187
188 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
189 long readLimit) {
190 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
191 }
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206 protected AbstractTrafficShapingHandler(
207 ObjectSizeEstimator objectSizeEstimator, Timer timer,
208 long writeLimit, long readLimit) {
209 init(objectSizeEstimator, timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
210 }
211
212
213
214
215
216
217
218 protected AbstractTrafficShapingHandler(Timer timer) {
219 init(new DefaultObjectSizeEstimator(), timer, 0, 0, DEFAULT_CHECK_INTERVAL);
220 }
221
222
223
224
225
226
227
228
229
230
231 protected AbstractTrafficShapingHandler(
232 ObjectSizeEstimator objectSizeEstimator, Timer timer) {
233 init(objectSizeEstimator, timer, 0, 0, DEFAULT_CHECK_INTERVAL);
234 }
235
236
237
238
239
240
241
242
243
244
245 protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
246 init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval);
247 }
248
249
250
251
252
253
254
255
256
257
258
259
260
261 protected AbstractTrafficShapingHandler(
262 ObjectSizeEstimator objectSizeEstimator, Timer timer,
263 long checkInterval) {
264 init(objectSizeEstimator, timer, 0, 0, checkInterval);
265 }
266
267
268
269
270
271
272
273
274 public void configure(long newWriteLimit, long newReadLimit,
275 long newCheckInterval) {
276 configure(newWriteLimit, newReadLimit);
277 configure(newCheckInterval);
278 }
279
280
281
282
283
284
285
286 public void configure(long newWriteLimit, long newReadLimit) {
287 writeLimit = newWriteLimit;
288 readLimit = newReadLimit;
289 if (trafficCounter != null) {
290 trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
291 }
292 }
293
294
295
296
297
298
299 public void configure(long newCheckInterval) {
300 checkInterval = newCheckInterval;
301 if (trafficCounter != null) {
302 trafficCounter.configure(checkInterval);
303 }
304 }
305
306
307
308
309
310
311
312
313 protected void doAccounting(TrafficCounter counter) {
314
315 }
316
317
318
319
320 private class ReopenReadTimerTask implements TimerTask {
321 ChannelHandlerContext ctx;
322 ReopenReadTimerTask(ChannelHandlerContext ctx) {
323 this.ctx = ctx;
324 }
325 public void run(Timeout timeoutArg) throws Exception {
326
327 if (release.get()) {
328 return;
329 }
330
331
332
333
334
335 if (ctx != null && ctx.getChannel() != null &&
336 ctx.getChannel().isConnected()) {
337
338
339 ctx.setAttachment(null);
340 ctx.getChannel().setReadable(true);
341 }
342 }
343 }
344
345
346
347
348
349
350 private static long getTimeToWait(long limit, long bytes, long lastTime,
351 long curtime) {
352 long interval = curtime - lastTime;
353 if (interval == 0) {
354
355 return 0;
356 }
357 return (bytes * 1000 / limit - interval) / 10 * 10;
358 }
359
360 @Override
361 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
362 throws Exception {
363 try {
364 long curtime = System.currentTimeMillis();
365 long size = objectSizeEstimator.estimateSize(e.getMessage());
366 if (trafficCounter != null) {
367 trafficCounter.bytesRecvFlowControl(ctx, size);
368 if (readLimit == 0) {
369
370 return;
371 }
372
373 long wait = getTimeToWait(readLimit,
374 trafficCounter.getCurrentReadBytes(),
375 trafficCounter.getLastTime(), curtime);
376 if (wait >= MINIMAL_WAIT) {
377
378 Channel channel = ctx.getChannel();
379
380 if (channel != null && channel.isConnected()) {
381
382 if (timer == null) {
383
384
385 if (release.get()) {
386 return;
387 }
388 Thread.sleep(wait);
389 return;
390 }
391 if (ctx.getAttachment() == null) {
392
393 ctx.setAttachment(Boolean.TRUE);
394 channel.setReadable(false);
395
396 TimerTask timerTask = new ReopenReadTimerTask(ctx);
397 timeout = timer.newTimeout(timerTask, wait,
398 TimeUnit.MILLISECONDS);
399 } else {
400
401
402
403 if (release.get()) {
404 return;
405 }
406 Thread.sleep(wait);
407 }
408 } else {
409
410
411 if (release.get()) {
412 return;
413 }
414 Thread.sleep(wait);
415 }
416 }
417 }
418 } finally {
419
420 super.messageReceived(ctx, e);
421 }
422 }
423
424 @Override
425 public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
426 throws Exception {
427 try {
428 long curtime = System.currentTimeMillis();
429 long size = objectSizeEstimator.estimateSize(e.getMessage());
430 if (trafficCounter != null) {
431 trafficCounter.bytesWriteFlowControl(size);
432 if (writeLimit == 0) {
433 return;
434 }
435
436
437 long wait = getTimeToWait(writeLimit,
438 trafficCounter.getCurrentWrittenBytes(),
439 trafficCounter.getLastTime(), curtime);
440 if (wait >= MINIMAL_WAIT) {
441
442 if (release.get()) {
443 return;
444 }
445 Thread.sleep(wait);
446 }
447 }
448 } finally {
449
450 super.writeRequested(ctx, e);
451 }
452 }
453 @Override
454 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
455 throws Exception {
456 if (e instanceof ChannelStateEvent) {
457 ChannelStateEvent cse = (ChannelStateEvent) e;
458 if (cse.getState() == ChannelState.INTEREST_OPS &&
459 (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
460
461
462 boolean readSuspended = ctx.getAttachment() != null;
463 if (readSuspended) {
464
465
466 e.getFuture().setSuccess();
467 return;
468 }
469 }
470 }
471 super.handleDownstream(ctx, e);
472 }
473
474
475
476
477
478
479 public TrafficCounter getTrafficCounter() {
480 return trafficCounter;
481 }
482
483 public void releaseExternalResources() {
484 if (trafficCounter != null) {
485 trafficCounter.stop();
486 }
487 release.set(true);
488 if (timeout != null) {
489 timeout.cancel();
490 }
491 timer.stop();
492 }
493
494 @Override
495 public String toString() {
496 return "TrafficShaping with Write Limit: " + writeLimit +
497 " Read Limit: " + readLimit + " and Counter: " +
498 (trafficCounter != null? trafficCounter.toString() : "none");
499 }
500 }