1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelEvent;
20 import org.jboss.netty.channel.ChannelHandlerContext;
21 import org.jboss.netty.channel.ChannelState;
22 import org.jboss.netty.channel.ChannelStateEvent;
23 import org.jboss.netty.channel.MessageEvent;
24 import org.jboss.netty.channel.SimpleChannelHandler;
25 import org.jboss.netty.logging.InternalLogger;
26 import org.jboss.netty.logging.InternalLoggerFactory;
27 import org.jboss.netty.util.DefaultObjectSizeEstimator;
28 import org.jboss.netty.util.ExternalResourceReleasable;
29 import org.jboss.netty.util.ObjectSizeEstimator;
30 import org.jboss.netty.util.Timeout;
31 import org.jboss.netty.util.Timer;
32 import org.jboss.netty.util.TimerTask;
33
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicBoolean;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 public abstract class AbstractTrafficShapingHandler extends
60 SimpleChannelHandler implements ExternalResourceReleasable {
61
62
63
64 static InternalLogger logger = InternalLoggerFactory
65 .getInstance(AbstractTrafficShapingHandler.class);
66
67
68
69
70 public static final long DEFAULT_CHECK_INTERVAL = 1000;
71
72
73
74
75
76 public static final long DEFAULT_MAX_TIME = 15000;
77
78
79
80
81 static final long MINIMAL_WAIT = 10;
82
83
84
85
86 protected TrafficCounter trafficCounter;
87
88
89
90
91 private ObjectSizeEstimator objectSizeEstimator;
92
93
94
95
96 protected Timer timer;
97
98
99
100
101 private volatile Timeout timeout;
102
103
104
105
106 private long writeLimit;
107
108
109
110
111 private long readLimit;
112
113
114
115
116 protected long checkInterval = DEFAULT_CHECK_INTERVAL;
117
118
119
120 protected long maxTime = DEFAULT_MAX_TIME;
121
122
123
124
125
126
127 final AtomicBoolean release = new AtomicBoolean(false);
128
129 private void init(ObjectSizeEstimator newObjectSizeEstimator,
130 Timer newTimer, long newWriteLimit, long newReadLimit,
131 long newCheckInterval, long newMaxTime) {
132 objectSizeEstimator = newObjectSizeEstimator;
133 timer = newTimer;
134 writeLimit = newWriteLimit;
135 readLimit = newReadLimit;
136 checkInterval = newCheckInterval;
137 maxTime = newMaxTime;
138
139 }
140
141
142
143
144
145 void setTrafficCounter(TrafficCounter newTrafficCounter) {
146 trafficCounter = newTrafficCounter;
147 }
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
163 long readLimit, long checkInterval) {
164 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
165 DEFAULT_MAX_TIME);
166 }
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 protected AbstractTrafficShapingHandler(
185 ObjectSizeEstimator objectSizeEstimator, Timer timer,
186 long writeLimit, long readLimit, long checkInterval) {
187 init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
188 }
189
190
191
192
193
194
195
196
197
198
199
200 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
201 long readLimit) {
202 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit,
203 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
204 }
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219 protected AbstractTrafficShapingHandler(
220 ObjectSizeEstimator objectSizeEstimator, Timer timer,
221 long writeLimit, long readLimit) {
222 init(objectSizeEstimator, timer, writeLimit, readLimit,
223 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
224 }
225
226
227
228
229
230
231
232 protected AbstractTrafficShapingHandler(Timer timer) {
233 init(new DefaultObjectSizeEstimator(), timer, 0, 0,
234 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
235 }
236
237
238
239
240
241
242
243
244
245
246 protected AbstractTrafficShapingHandler(
247 ObjectSizeEstimator objectSizeEstimator, Timer timer) {
248 init(objectSizeEstimator, timer, 0, 0,
249 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
250 }
251
252
253
254
255
256
257
258
259
260
261 protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
262 init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
263 }
264
265
266
267
268
269
270
271
272
273
274
275
276
277 protected AbstractTrafficShapingHandler(
278 ObjectSizeEstimator objectSizeEstimator, Timer timer,
279 long checkInterval) {
280 init(objectSizeEstimator, timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
281 }
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
299 long readLimit, long checkInterval, long maxTime) {
300 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
301 maxTime);
302 }
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322 protected AbstractTrafficShapingHandler(
323 ObjectSizeEstimator objectSizeEstimator, Timer timer,
324 long writeLimit, long readLimit, long checkInterval, long maxTime) {
325 init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, maxTime);
326 }
327
328
329
330
331 public void configure(long newWriteLimit, long newReadLimit,
332 long newCheckInterval) {
333 configure(newWriteLimit, newReadLimit);
334 configure(newCheckInterval);
335 }
336
337
338
339
340 public void configure(long newWriteLimit, long newReadLimit) {
341 writeLimit = newWriteLimit;
342 readLimit = newReadLimit;
343 if (trafficCounter != null) {
344 trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
345 }
346 }
347
348
349
350
351 public void configure(long newCheckInterval) {
352 setCheckInterval(newCheckInterval);
353 }
354
355
356
357
358 public long getWriteLimit() {
359 return writeLimit;
360 }
361
362
363
364
365 public void setWriteLimit(long writeLimit) {
366 this.writeLimit = writeLimit;
367 if (trafficCounter != null) {
368 trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
369 }
370 }
371
372
373
374
375 public long getReadLimit() {
376 return readLimit;
377 }
378
379
380
381
382 public void setReadLimit(long readLimit) {
383 this.readLimit = readLimit;
384 if (trafficCounter != null) {
385 trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
386 }
387 }
388
389
390
391
392 public long getCheckInterval() {
393 return checkInterval;
394 }
395
396
397
398
399 public void setCheckInterval(long newCheckInterval) {
400 this.checkInterval = newCheckInterval;
401 if (trafficCounter != null) {
402 trafficCounter.configure(checkInterval);
403 }
404 }
405
406
407
408
409 public long getMaxTimeWait() {
410 return maxTime;
411 }
412
413
414
415
416
417
418 public void setMaxTimeWait(long maxTime) {
419 this.maxTime = maxTime;
420 }
421
422
423
424
425
426
427
428
429 protected void doAccounting(TrafficCounter counter) {
430
431 }
432
433
434
435
436 private class ReopenReadTimerTask implements TimerTask {
437 final ChannelHandlerContext ctx;
438 ReopenReadTimerTask(ChannelHandlerContext ctx) {
439 this.ctx = ctx;
440 }
441 public void run(Timeout timeoutArg) throws Exception {
442
443 if (release.get()) {
444 return;
445 }
446 if (!ctx.getChannel().isReadable() && ctx.getAttachment() == null) {
447
448
449 if (logger.isDebugEnabled()) {
450 logger.debug("Not Unsuspend: " + ctx.getChannel().isReadable() + ":" +
451 (ctx.getAttachment() == null));
452 }
453 ctx.setAttachment(null);
454 } else {
455
456 if (logger.isDebugEnabled()) {
457 if (ctx.getChannel().isReadable() && ctx.getAttachment() != null) {
458 logger.debug("Unsuspend: " + ctx.getChannel().isReadable() + ":" +
459 (ctx.getAttachment() == null));
460 } else {
461 logger.debug("Normal Unsuspend: " + ctx.getChannel().isReadable() + ":" +
462 (ctx.getAttachment() == null));
463 }
464 }
465 ctx.setAttachment(null);
466 ctx.getChannel().setReadable(true);
467 }
468 if (logger.isDebugEnabled()) {
469 logger.debug("Unsupsend final status => " + ctx.getChannel().isReadable() + ":" +
470 (ctx.getAttachment() == null));
471 }
472 }
473 }
474
475 @Override
476 public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
477 throws Exception {
478 try {
479 long size = objectSizeEstimator.estimateSize(evt.getMessage());
480 if (size > 0 && trafficCounter != null) {
481
482 long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime);
483 if (wait >= MINIMAL_WAIT) {
484
485 if (release.get()) {
486 return;
487 }
488 Channel channel = ctx.getChannel();
489 if (channel != null && channel.isConnected()) {
490
491 if (logger.isDebugEnabled()) {
492 logger.debug("Read Suspend: " + wait + ":" + channel.isReadable() + ":" +
493 (ctx.getAttachment() == null));
494 }
495 if (timer == null) {
496
497
498 Thread.sleep(wait);
499 return;
500 }
501 if (channel.isReadable() && ctx.getAttachment() == null) {
502 ctx.setAttachment(Boolean.TRUE);
503 channel.setReadable(false);
504 if (logger.isDebugEnabled()) {
505 logger.debug("Suspend final status => " + channel.isReadable() + ":" +
506 (ctx.getAttachment() == null));
507 }
508
509
510 TimerTask timerTask = new ReopenReadTimerTask(ctx);
511 timeout = timer.newTimeout(timerTask, wait,
512 TimeUnit.MILLISECONDS);
513 }
514 }
515 }
516 }
517 } finally {
518
519 super.messageReceived(ctx, evt);
520 }
521 }
522
523 @Override
524 public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
525 throws Exception {
526 long wait = 0;
527 try {
528 long size = objectSizeEstimator.estimateSize(evt.getMessage());
529 if (size > 0 && trafficCounter != null) {
530
531 wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime);
532 if (logger.isDebugEnabled()) {
533 logger.debug("Write Suspend: " + wait + ":" + ctx.getChannel().isReadable() + ":" +
534 (ctx.getAttachment() == null));
535 }
536 if (wait >= MINIMAL_WAIT) {
537 if (release.get()) {
538 return;
539 }
540
541
542
543
544
545
546
547 } else {
548 wait = 0;
549 }
550 }
551 } finally {
552 if (release.get()) {
553 return;
554 }
555
556 submitWrite(ctx, evt, wait);
557 }
558 }
559
560 protected void internalSubmitWrite(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
561 super.writeRequested(ctx, evt);
562 }
563
564 protected abstract void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long delay)
565 throws Exception;
566
567 @Override
568 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
569 throws Exception {
570 if (e instanceof ChannelStateEvent) {
571 ChannelStateEvent cse = (ChannelStateEvent) e;
572 if (cse.getState() == ChannelState.INTEREST_OPS &&
573 (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
574
575
576 boolean readSuspended = ctx.getAttachment() != null;
577 if (readSuspended) {
578
579
580 e.getFuture().setSuccess();
581 return;
582 }
583 }
584 }
585 super.handleDownstream(ctx, e);
586 }
587
588
589
590
591
592
593 public TrafficCounter getTrafficCounter() {
594 return trafficCounter;
595 }
596
597 public void releaseExternalResources() {
598 if (trafficCounter != null) {
599 trafficCounter.stop();
600 }
601 release.set(true);
602 if (timeout != null) {
603 timeout.cancel();
604 }
605
606 }
607
608 @Override
609 public String toString() {
610 return "TrafficShaping with Write Limit: " + writeLimit +
611 " Read Limit: " + readLimit + " every: " + checkInterval + " and Counter: " +
612 (trafficCounter != null? trafficCounter.toString() : "none");
613 }
614 }