1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelEvent;
20 import org.jboss.netty.channel.ChannelHandlerContext;
21 import org.jboss.netty.channel.ChannelState;
22 import org.jboss.netty.channel.ChannelStateEvent;
23 import org.jboss.netty.channel.MessageEvent;
24 import org.jboss.netty.channel.SimpleChannelHandler;
25 import org.jboss.netty.logging.InternalLogger;
26 import org.jboss.netty.logging.InternalLoggerFactory;
27 import org.jboss.netty.util.DefaultObjectSizeEstimator;
28 import org.jboss.netty.util.ExternalResourceReleasable;
29 import org.jboss.netty.util.ObjectSizeEstimator;
30 import org.jboss.netty.util.Timeout;
31 import org.jboss.netty.util.Timer;
32 import org.jboss.netty.util.TimerTask;
33
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicBoolean;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 public abstract class AbstractTrafficShapingHandler extends
60 SimpleChannelHandler implements ExternalResourceReleasable {
61
62
63
64 static InternalLogger logger = InternalLoggerFactory
65 .getInstance(AbstractTrafficShapingHandler.class);
66
67
68
69
70 public static final long DEFAULT_CHECK_INTERVAL = 1000;
71
72
73
74
75 private static final long MINIMAL_WAIT = 10;
76
77
78
79
80 protected TrafficCounter trafficCounter;
81
82
83
84
85 private ObjectSizeEstimator objectSizeEstimator;
86
87
88
89
90 protected Timer timer;
91
92
93
94
95 private volatile Timeout timeout;
96
97
98
99
100 private long writeLimit;
101
102
103
104
105 private long readLimit;
106
107
108
109
110 protected long checkInterval = DEFAULT_CHECK_INTERVAL;
111
112
113
114
115
116
117 final AtomicBoolean release = new AtomicBoolean(false);
118
119 private void init(ObjectSizeEstimator newObjectSizeEstimator,
120 Timer newTimer, long newWriteLimit, long newReadLimit,
121 long newCheckInterval) {
122 objectSizeEstimator = newObjectSizeEstimator;
123 timer = newTimer;
124 writeLimit = newWriteLimit;
125 readLimit = newReadLimit;
126 checkInterval = newCheckInterval;
127
128 }
129
130
131
132
133
134 void setTrafficCounter(TrafficCounter newTrafficCounter) {
135 trafficCounter = newTrafficCounter;
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
152 long readLimit, long checkInterval) {
153 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval);
154 }
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172 protected AbstractTrafficShapingHandler(
173 ObjectSizeEstimator objectSizeEstimator, Timer timer,
174 long writeLimit, long readLimit, long checkInterval) {
175 init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval);
176 }
177
178
179
180
181
182
183
184
185
186
187
188 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
189 long readLimit) {
190 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
191 }
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206 protected AbstractTrafficShapingHandler(
207 ObjectSizeEstimator objectSizeEstimator, Timer timer,
208 long writeLimit, long readLimit) {
209 init(objectSizeEstimator, timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
210 }
211
212
213
214
215
216
217
218 protected AbstractTrafficShapingHandler(Timer timer) {
219 init(new DefaultObjectSizeEstimator(), timer, 0, 0, DEFAULT_CHECK_INTERVAL);
220 }
221
222
223
224
225
226
227
228
229
230
231 protected AbstractTrafficShapingHandler(
232 ObjectSizeEstimator objectSizeEstimator, Timer timer) {
233 init(objectSizeEstimator, timer, 0, 0, DEFAULT_CHECK_INTERVAL);
234 }
235
236
237
238
239
240
241
242
243
244
245 protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
246 init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval);
247 }
248
249
250
251
252
253
254
255
256
257
258
259
260
261 protected AbstractTrafficShapingHandler(
262 ObjectSizeEstimator objectSizeEstimator, Timer timer,
263 long checkInterval) {
264 init(objectSizeEstimator, timer, 0, 0, checkInterval);
265 }
266
267
268
269
270 public void configure(long newWriteLimit, long newReadLimit,
271 long newCheckInterval) {
272 configure(newWriteLimit, newReadLimit);
273 configure(newCheckInterval);
274 }
275
276
277
278
279 public void configure(long newWriteLimit, long newReadLimit) {
280 writeLimit = newWriteLimit;
281 readLimit = newReadLimit;
282 if (trafficCounter != null) {
283 trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
284 }
285 }
286
287
288
289
290 public void configure(long newCheckInterval) {
291 checkInterval = newCheckInterval;
292 if (trafficCounter != null) {
293 trafficCounter.configure(checkInterval);
294 }
295 }
296
297
298
299
300
301
302
303
304 protected void doAccounting(TrafficCounter counter) {
305
306 }
307
308
309
310
311 private class ReopenReadTimerTask implements TimerTask {
312 final ChannelHandlerContext ctx;
313 ReopenReadTimerTask(ChannelHandlerContext ctx) {
314 this.ctx = ctx;
315 }
316 public void run(Timeout timeoutArg) throws Exception {
317
318 if (release.get()) {
319 return;
320 }
321
322
323
324
325
326 if (ctx != null && ctx.getChannel() != null &&
327 ctx.getChannel().isConnected()) {
328
329
330 ctx.setAttachment(null);
331 ctx.getChannel().setReadable(true);
332 }
333 }
334 }
335
336
337
338
339 private static long getTimeToWait(long limit, long bytes, long lastTime, long curtime) {
340 long interval = curtime - lastTime;
341 if (interval <= 0) {
342
343 return 0;
344 }
345 return (bytes * 1000 / limit - interval) / 10 * 10;
346 }
347
348 @Override
349 public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
350 throws Exception {
351 try {
352 long curtime = System.currentTimeMillis();
353 long size = objectSizeEstimator.estimateSize(evt.getMessage());
354 if (trafficCounter != null) {
355 trafficCounter.bytesRecvFlowControl(size);
356 if (readLimit == 0) {
357
358 return;
359 }
360
361 long wait = getTimeToWait(readLimit,
362 trafficCounter.getCurrentReadBytes(),
363 trafficCounter.getLastTime(), curtime);
364 if (wait >= MINIMAL_WAIT) {
365
366 Channel channel = ctx.getChannel();
367
368 if (channel != null && channel.isConnected()) {
369
370 if (timer == null) {
371
372
373 if (release.get()) {
374 return;
375 }
376 Thread.sleep(wait);
377 return;
378 }
379 if (ctx.getAttachment() == null) {
380
381 ctx.setAttachment(Boolean.TRUE);
382 channel.setReadable(false);
383
384 TimerTask timerTask = new ReopenReadTimerTask(ctx);
385 timeout = timer.newTimeout(timerTask, wait,
386 TimeUnit.MILLISECONDS);
387 } else {
388
389
390
391 if (release.get()) {
392 return;
393 }
394 Thread.sleep(wait);
395 }
396 } else {
397
398
399 if (release.get()) {
400 return;
401 }
402 Thread.sleep(wait);
403 }
404 }
405 }
406 } finally {
407
408 super.messageReceived(ctx, evt);
409 }
410 }
411
412 @Override
413 public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
414 throws Exception {
415 try {
416 long curtime = System.currentTimeMillis();
417 long size = objectSizeEstimator.estimateSize(evt.getMessage());
418 if (trafficCounter != null) {
419 trafficCounter.bytesWriteFlowControl(size);
420 if (writeLimit == 0) {
421 return;
422 }
423
424
425 long wait = getTimeToWait(writeLimit,
426 trafficCounter.getCurrentWrittenBytes(),
427 trafficCounter.getLastTime(), curtime);
428 if (wait >= MINIMAL_WAIT) {
429
430 if (release.get()) {
431 return;
432 }
433 Thread.sleep(wait);
434 }
435 }
436 } finally {
437
438 super.writeRequested(ctx, evt);
439 }
440 }
441 @Override
442 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
443 throws Exception {
444 if (e instanceof ChannelStateEvent) {
445 ChannelStateEvent cse = (ChannelStateEvent) e;
446 if (cse.getState() == ChannelState.INTEREST_OPS &&
447 (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
448
449
450 boolean readSuspended = ctx.getAttachment() != null;
451 if (readSuspended) {
452
453
454 e.getFuture().setSuccess();
455 return;
456 }
457 }
458 }
459 super.handleDownstream(ctx, e);
460 }
461
462
463
464
465
466
467 public TrafficCounter getTrafficCounter() {
468 return trafficCounter;
469 }
470
471 public void releaseExternalResources() {
472 if (trafficCounter != null) {
473 trafficCounter.stop();
474 }
475 release.set(true);
476 if (timeout != null) {
477 timeout.cancel();
478 }
479 timer.stop();
480 }
481
482 @Override
483 public String toString() {
484 return "TrafficShaping with Write Limit: " + writeLimit +
485 " Read Limit: " + readLimit + " and Counter: " +
486 (trafficCounter != null? trafficCounter.toString() : "none");
487 }
488 }