1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.channel.Channel;
20 import org.jboss.netty.channel.ChannelHandlerContext;
21 import org.jboss.netty.channel.ChannelStateEvent;
22 import org.jboss.netty.channel.MessageEvent;
23 import org.jboss.netty.channel.SimpleChannelHandler;
24 import org.jboss.netty.logging.InternalLogger;
25 import org.jboss.netty.logging.InternalLoggerFactory;
26 import org.jboss.netty.util.DefaultObjectSizeEstimator;
27 import org.jboss.netty.util.ExternalResourceReleasable;
28 import org.jboss.netty.util.ObjectSizeEstimator;
29 import org.jboss.netty.util.Timeout;
30 import org.jboss.netty.util.Timer;
31 import org.jboss.netty.util.TimerTask;
32
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicBoolean;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 public abstract class AbstractTrafficShapingHandler extends
57 SimpleChannelHandler implements ExternalResourceReleasable {
58
59
60
61 static InternalLogger logger = InternalLoggerFactory
62 .getInstance(AbstractTrafficShapingHandler.class);
63
64
65
66
67 public static final long DEFAULT_CHECK_INTERVAL = 1000;
68
69
70
71
72
73
74 public static final long DEFAULT_MAX_TIME = 15000;
75
76
77
78
79 static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
80
81
82
83
84 static final long MINIMAL_WAIT = 10;
85
86 static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
87 static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
88 static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
89
90
91
92
93 protected TrafficCounter trafficCounter;
94
95
96
97
98 private ObjectSizeEstimator objectSizeEstimator;
99
100
101
102
103 protected Timer timer;
104
105
106
107
108 volatile Timeout timeout;
109
110
111
112
113 private volatile long writeLimit;
114
115
116
117
118 private volatile long readLimit;
119
120
121
122
123 protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL;
124
125
126
127
128 protected volatile long maxTime = DEFAULT_MAX_TIME;
129
130
131
132
133 volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL;
134
135
136
137
138 volatile long maxWriteSize = DEFAULT_MAX_SIZE;
139
140
141
142
143
144
145 final AtomicBoolean release = new AtomicBoolean(false);
146 final int index;
147
148
149
150
151
152 static final class ReadWriteStatus {
153 volatile boolean readSuspend;
154 volatile TimerTask reopenReadTimerTask;
155 }
156
157
158
159
160
161 public static class SimpleObjectSizeEstimator extends DefaultObjectSizeEstimator {
162 @Override
163 public int estimateSize(Object o) {
164 int size;
165 if (o instanceof ChannelBuffer) {
166 size = ((ChannelBuffer) o).readableBytes();
167 } else {
168 size = super.estimateSize(o);
169 }
170 return size;
171 }
172 }
173
174
175
176
177
178
179
180
181
182 int userDefinedWritabilityIndex() {
183 if (this instanceof GlobalChannelTrafficShapingHandler) {
184 return GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
185 } else if (this instanceof GlobalTrafficShapingHandler) {
186 return GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
187 } else {
188 return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
189 }
190 }
191
192 private void init(ObjectSizeEstimator newObjectSizeEstimator,
193 Timer newTimer, long newWriteLimit, long newReadLimit,
194 long newCheckInterval, long newMaxTime) {
195 if (newMaxTime <= 0) {
196 throw new IllegalArgumentException("maxTime must be positive");
197 }
198 objectSizeEstimator = newObjectSizeEstimator;
199 timer = newTimer;
200 writeLimit = newWriteLimit;
201 readLimit = newReadLimit;
202 checkInterval = newCheckInterval;
203 maxTime = newMaxTime;
204
205 }
206
207
208
209
210 void setTrafficCounter(TrafficCounter newTrafficCounter) {
211 trafficCounter = newTrafficCounter;
212 }
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
229 long readLimit, long checkInterval) {
230 index = userDefinedWritabilityIndex();
231 init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
232 DEFAULT_MAX_TIME);
233 }
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252 protected AbstractTrafficShapingHandler(
253 ObjectSizeEstimator objectSizeEstimator, Timer timer,
254 long writeLimit, long readLimit, long checkInterval) {
255 index = userDefinedWritabilityIndex();
256 init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
257 }
258
259
260
261
262
263
264
265
266
267
268
269
270
271 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
272 long readLimit) {
273 index = userDefinedWritabilityIndex();
274 init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit,
275 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
276 }
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293 protected AbstractTrafficShapingHandler(
294 ObjectSizeEstimator objectSizeEstimator, Timer timer,
295 long writeLimit, long readLimit) {
296 index = userDefinedWritabilityIndex();
297 init(objectSizeEstimator, timer, writeLimit, readLimit,
298 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
299 }
300
301
302
303
304
305
306
307
308
309 protected AbstractTrafficShapingHandler(Timer timer) {
310 index = userDefinedWritabilityIndex();
311 init(new SimpleObjectSizeEstimator(), timer, 0, 0,
312 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
313 }
314
315
316
317
318
319
320
321
322
323
324
325
326 protected AbstractTrafficShapingHandler(
327 ObjectSizeEstimator objectSizeEstimator, Timer timer) {
328 index = userDefinedWritabilityIndex();
329 init(objectSizeEstimator, timer, 0, 0,
330 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
331 }
332
333
334
335
336
337
338
339
340
341
342
343 protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
344 index = userDefinedWritabilityIndex();
345 init(new SimpleObjectSizeEstimator(), timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
346 }
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361 protected AbstractTrafficShapingHandler(
362 ObjectSizeEstimator objectSizeEstimator, Timer timer,
363 long checkInterval) {
364 index = userDefinedWritabilityIndex();
365 init(objectSizeEstimator, timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
366 }
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
385 long readLimit, long checkInterval, long maxTime) {
386 index = userDefinedWritabilityIndex();
387 init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
388 maxTime);
389 }
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410 protected AbstractTrafficShapingHandler(
411 ObjectSizeEstimator objectSizeEstimator, Timer timer,
412 long writeLimit, long readLimit, long checkInterval, long maxTime) {
413 index = userDefinedWritabilityIndex();
414 init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, maxTime);
415 }
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432 public void configure(long newWriteLimit, long newReadLimit,
433 long newCheckInterval) {
434 configure(newWriteLimit, newReadLimit);
435 configure(newCheckInterval);
436 }
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451 public void configure(long newWriteLimit, long newReadLimit) {
452 writeLimit = newWriteLimit;
453 readLimit = newReadLimit;
454 if (trafficCounter != null) {
455 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
456 }
457 }
458
459
460
461
462 public void configure(long newCheckInterval) {
463 setCheckInterval(newCheckInterval);
464 }
465
466
467
468
469 public long getWriteLimit() {
470 return writeLimit;
471 }
472
473
474
475
476
477
478
479
480
481
482 public void setWriteLimit(long writeLimit) {
483 this.writeLimit = writeLimit;
484 if (trafficCounter != null) {
485 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
486 }
487 }
488
489
490
491
492 public long getReadLimit() {
493 return readLimit;
494 }
495
496
497
498
499
500
501
502
503
504
505 public void setReadLimit(long readLimit) {
506 this.readLimit = readLimit;
507 if (trafficCounter != null) {
508 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
509 }
510 }
511
512
513
514
515 public long getCheckInterval() {
516 return checkInterval;
517 }
518
519
520
521
522 public void setCheckInterval(long newCheckInterval) {
523 checkInterval = newCheckInterval;
524 if (trafficCounter != null) {
525 trafficCounter.configure(checkInterval);
526 }
527 }
528
529
530
531
532 public long getMaxTimeWait() {
533 return maxTime;
534 }
535
536
537
538
539
540
541
542
543
544
545
546
547 public void setMaxTimeWait(long maxTime) {
548 if (maxTime <= 0) {
549 throw new IllegalArgumentException("maxTime must be positive");
550 }
551 this.maxTime = maxTime;
552 }
553
554
555
556
557 public long getMaxWriteDelay() {
558 return maxWriteDelay;
559 }
560
561
562
563
564
565
566
567
568
569
570
571 public void setMaxWriteDelay(long maxWriteDelay) {
572 if (maxWriteDelay <= 0) {
573 throw new IllegalArgumentException("maxWriteDelay must be positive");
574 }
575 this.maxWriteDelay = maxWriteDelay;
576 }
577
578
579
580
581 public long getMaxWriteSize() {
582 return maxWriteSize;
583 }
584
585
586
587
588
589
590
591
592
593
594
595
596 public void setMaxWriteSize(long maxWriteSize) {
597 this.maxWriteSize = maxWriteSize;
598 }
599
600
601
602
603
604
605
606
607 protected void doAccounting(TrafficCounter counter) {
608
609 }
610
611
612
613
614 class ReopenReadTimerTask implements TimerTask {
615 final ChannelHandlerContext ctx;
616 ReopenReadTimerTask(ChannelHandlerContext ctx) {
617 this.ctx = ctx;
618 }
619 public void run(Timeout timeoutArg) throws Exception {
620
621 if (release.get()) {
622 return;
623 }
624 ReadWriteStatus rws = checkAttachment(ctx);
625 Channel channel = ctx.getChannel();
626 if (! channel.isConnected()) {
627
628 return;
629 }
630 if (!channel.isReadable() && ! rws.readSuspend) {
631
632
633 if (logger.isDebugEnabled()) {
634 logger.debug("Not unsuspend: " + channel.isReadable() + ':' +
635 rws.readSuspend);
636 }
637 rws.readSuspend = false;
638 } else {
639
640 if (logger.isDebugEnabled()) {
641 if (channel.isReadable() && rws.readSuspend) {
642 logger.debug("Unsuspend: " + channel.isReadable() + ':' +
643 rws.readSuspend);
644 } else {
645 logger.debug("Normal unsuspend: " + channel.isReadable() + ':' +
646 rws.readSuspend);
647 }
648 }
649 rws.readSuspend = false;
650 channel.setReadable(true);
651 }
652 if (logger.isDebugEnabled()) {
653 logger.debug("Unsupsend final status => " + channel.isReadable() + ':' +
654 rws.readSuspend);
655 }
656 }
657 }
658
659
660
661
662 void releaseReadSuspended(ChannelHandlerContext ctx) {
663 ReadWriteStatus rws = checkAttachment(ctx);
664 rws.readSuspend = false;
665 ctx.getChannel().setReadable(true);
666 }
667
668 @Override
669 public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
670 throws Exception {
671 long now = TrafficCounter.milliSecondFromNano();
672 try {
673 ReadWriteStatus rws = checkAttachment(ctx);
674 long size = calculateSize(evt.getMessage());
675 if (size > 0 && trafficCounter != null) {
676
677 long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
678 wait = checkWaitReadTime(ctx, wait, now);
679 if (wait >= MINIMAL_WAIT) {
680
681 if (release.get()) {
682 return;
683 }
684 Channel channel = ctx.getChannel();
685 if (channel != null && channel.isConnected()) {
686
687 if (logger.isDebugEnabled()) {
688 logger.debug("Read suspend: " + wait + ':' + channel.isReadable() + ':' +
689 rws.readSuspend);
690 }
691 if (timer == null) {
692
693
694 Thread.sleep(wait);
695 return;
696 }
697 if (channel.isReadable() && ! rws.readSuspend) {
698 rws.readSuspend = true;
699 channel.setReadable(false);
700 if (logger.isDebugEnabled()) {
701 logger.debug("Suspend final status => " + channel.isReadable() + ':' +
702 rws.readSuspend);
703 }
704
705
706 if (rws.reopenReadTimerTask == null) {
707 rws.reopenReadTimerTask = new ReopenReadTimerTask(ctx);
708 }
709 timeout = timer.newTimeout(rws.reopenReadTimerTask, wait,
710 TimeUnit.MILLISECONDS);
711 }
712 }
713 }
714 }
715 } finally {
716 informReadOperation(ctx, now);
717
718 ctx.sendUpstream(evt);
719 }
720 }
721
722
723
724
725
726
727
728 long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
729
730 return wait;
731 }
732
733
734
735
736
737 void informReadOperation(final ChannelHandlerContext ctx, final long now) {
738
739 }
740
741 @Override
742 public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
743 throws Exception {
744 long wait = 0;
745 long size = calculateSize(evt.getMessage());
746 long now = TrafficCounter.milliSecondFromNano();
747 Channel channel = ctx.getChannel();
748 try {
749 if (size > 0 && trafficCounter != null) {
750
751 wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
752 if (logger.isDebugEnabled()) {
753 logger.debug("Write suspend: " + wait + ':' + channel.isWritable() + ':' +
754 channel.getUserDefinedWritability(index));
755 }
756 if (wait < MINIMAL_WAIT || release.get()) {
757 wait = 0;
758 }
759 }
760 } finally {
761
762 submitWrite(ctx, evt, size, wait, now);
763 }
764 }
765
766 @Deprecated
767 protected void internalSubmitWrite(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
768 ctx.sendDownstream(evt);
769 }
770
771 @Deprecated
772 protected void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt,
773 final long delay) throws Exception {
774 submitWrite(ctx, evt, calculateSize(evt.getMessage()), delay, TrafficCounter.milliSecondFromNano());
775 }
776
777 abstract void submitWrite(ChannelHandlerContext ctx, MessageEvent evt, long size,
778 long delay, long now) throws Exception;
779
780 void setWritable(ChannelHandlerContext ctx, boolean writable) {
781 Channel channel = ctx.getChannel();
782 if (channel.isConnected()) {
783 channel.setUserDefinedWritability(index, writable);
784 }
785 }
786
787
788
789
790
791
792
793 void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
794 if (queueSize > maxWriteSize || delay > maxWriteDelay) {
795 setWritable(ctx, false);
796 }
797 }
798
799
800
801
802 void releaseWriteSuspended(ChannelHandlerContext ctx) {
803 setWritable(ctx, true);
804 }
805
806
807
808
809
810 public TrafficCounter getTrafficCounter() {
811 return trafficCounter;
812 }
813
814 public void releaseExternalResources() {
815 if (trafficCounter != null) {
816 trafficCounter.stop();
817 }
818 release.set(true);
819 if (timeout != null) {
820 timeout.cancel();
821 }
822
823 }
824
825 static ReadWriteStatus checkAttachment(ChannelHandlerContext ctx) {
826 ReadWriteStatus rws = (ReadWriteStatus) ctx.getAttachment();
827 if (rws == null) {
828 rws = new ReadWriteStatus();
829 ctx.setAttachment(rws);
830 }
831 return rws;
832 }
833
834 @Override
835 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
836 checkAttachment(ctx);
837 setWritable(ctx, true);
838 super.channelConnected(ctx, e);
839 }
840
841 protected long calculateSize(Object obj) {
842
843 return objectSizeEstimator.estimateSize(obj);
844 }
845
846 @Override
847 public String toString() {
848 StringBuilder builder = new StringBuilder(290)
849 .append("TrafficShaping with Write Limit: ").append(writeLimit)
850 .append(" Read Limit: ").append(readLimit)
851 .append(" CheckInterval: ").append(checkInterval)
852 .append(" maxDelay: ").append(maxWriteDelay)
853 .append(" maxSize: ").append(maxWriteSize)
854 .append(" and Counter: ");
855 if (trafficCounter != null) {
856 builder.append(trafficCounter);
857 } else {
858 builder.append("none");
859 }
860 return builder.toString();
861 }
862 }