1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelHandler.Sharable;
20 import org.jboss.netty.channel.ChannelHandlerContext;
21 import org.jboss.netty.channel.ChannelStateEvent;
22 import org.jboss.netty.channel.MessageEvent;
23 import org.jboss.netty.logging.InternalLogger;
24 import org.jboss.netty.logging.InternalLoggerFactory;
25 import org.jboss.netty.util.ObjectSizeEstimator;
26 import org.jboss.netty.util.Timeout;
27 import org.jboss.netty.util.Timer;
28 import org.jboss.netty.util.TimerTask;
29
30 import java.util.AbstractCollection;
31 import java.util.Collection;
32 import java.util.Iterator;
33 import java.util.LinkedList;
34 import java.util.List;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ConcurrentMap;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicLong;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 @Sharable
88 public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
89 private static final InternalLogger logger =
90 InternalLoggerFactory.getInstance(GlobalChannelTrafficShapingHandler.class);
91
92
93
94 final ConcurrentMap<Integer, PerChannel> channelQueues = new ConcurrentHashMap<Integer, PerChannel>();
95
96
97
98
99 private final AtomicLong queuesSize = new AtomicLong();
100
101
102
103
104 private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
105
106
107
108
109 private final AtomicLong cumulativeReadBytes = new AtomicLong();
110
111
112
113
114
115 long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100;
116
117
118
119
120 private volatile long writeChannelLimit;
121
122
123
124
125 private volatile long readChannelLimit;
126
127 private static final float DEFAULT_DEVIATION = 0.1F;
128 private static final float MAX_DEVIATION = 0.4F;
129 private static final float DEFAULT_SLOWDOWN = 0.4F;
130 private static final float DEFAULT_ACCELERATION = -0.1F;
131 private volatile float maxDeviation;
132 private volatile float accelerationFactor;
133 private volatile float slowDownFactor;
134 private volatile boolean readDeviationActive;
135 private volatile boolean writeDeviationActive;
136
137 static final class PerChannel {
138 List<ToSend> messagesQueue;
139 TrafficCounter channelTrafficCounter;
140 long queueSize;
141 long lastWriteTimestamp;
142 long lastReadTimestamp;
143 }
144
145
146
147
148 void createGlobalTrafficCounter(Timer timer) {
149
150 setMaxDeviation(DEFAULT_DEVIATION, DEFAULT_SLOWDOWN, DEFAULT_ACCELERATION);
151 if (timer == null) {
152 throw new IllegalArgumentException("Timer must not be null");
153 }
154 TrafficCounter tc = new GlobalChannelTrafficCounter(this, timer, "GlobalChannelTC", checkInterval);
155 setTrafficCounter(tc);
156 tc.start();
157 }
158
159 @Override
160 int userDefinedWritabilityIndex() {
161 return AbstractTrafficShapingHandler.GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
162 }
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183 public GlobalChannelTrafficShapingHandler(Timer timer,
184 long writeGlobalLimit, long readGlobalLimit,
185 long writeChannelLimit, long readChannelLimit,
186 long checkInterval, long maxTime) {
187 super(timer, writeGlobalLimit, readGlobalLimit, checkInterval, maxTime);
188 createGlobalTrafficCounter(timer);
189 this.writeChannelLimit = writeChannelLimit;
190 this.readChannelLimit = readChannelLimit;
191 }
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210 public GlobalChannelTrafficShapingHandler(Timer timer,
211 long writeGlobalLimit, long readGlobalLimit,
212 long writeChannelLimit, long readChannelLimit,
213 long checkInterval) {
214 super(timer, writeGlobalLimit, readGlobalLimit, checkInterval);
215 this.writeChannelLimit = writeChannelLimit;
216 this.readChannelLimit = readChannelLimit;
217 createGlobalTrafficCounter(timer);
218 }
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234 public GlobalChannelTrafficShapingHandler(Timer timer,
235 long writeGlobalLimit, long readGlobalLimit,
236 long writeChannelLimit, long readChannelLimit) {
237 super(timer, writeGlobalLimit, readGlobalLimit);
238 this.writeChannelLimit = writeChannelLimit;
239 this.readChannelLimit = readChannelLimit;
240 createGlobalTrafficCounter(timer);
241 }
242
243
244
245
246
247
248
249
250
251
252 public GlobalChannelTrafficShapingHandler(Timer timer, long checkInterval) {
253 super(timer, checkInterval);
254 createGlobalTrafficCounter(timer);
255 }
256
257
258
259
260
261
262
263 public GlobalChannelTrafficShapingHandler(Timer timer) {
264 super(timer);
265 createGlobalTrafficCounter(timer);
266 }
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286 public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer, long writeLimit,
287 long readLimit, long writeChannelLimit, long readChannelLimit, long checkInterval, long maxTime) {
288 super(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, maxTime);
289 this.writeChannelLimit = writeChannelLimit;
290 this.readChannelLimit = readChannelLimit;
291 createGlobalTrafficCounter(timer);
292 }
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310 public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer, long writeLimit,
311 long readLimit, long writeChannelLimit, long readChannelLimit, long checkInterval) {
312 super(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval);
313 this.writeChannelLimit = writeChannelLimit;
314 this.readChannelLimit = readChannelLimit;
315 createGlobalTrafficCounter(timer);
316 }
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331 public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer, long writeLimit,
332 long readLimit, long writeChannelLimit, long readChannelLimit) {
333 super(objectSizeEstimator, timer, writeLimit, readLimit);
334 this.writeChannelLimit = writeChannelLimit;
335 this.readChannelLimit = readChannelLimit;
336 createGlobalTrafficCounter(timer);
337 }
338
339
340
341
342
343
344
345
346
347 public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer,
348 long checkInterval) {
349 super(objectSizeEstimator, timer, checkInterval);
350 createGlobalTrafficCounter(timer);
351 }
352
353
354
355
356
357
358 public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer) {
359 super(objectSizeEstimator, timer);
360 createGlobalTrafficCounter(timer);
361 }
362
363
364
365
366 public float maxDeviation() {
367 return maxDeviation;
368 }
369
370
371
372
373 public float accelerationFactor() {
374 return accelerationFactor;
375 }
376
377
378
379
380 public float slowDownFactor() {
381 return slowDownFactor;
382 }
383
384
385
386
387
388
389
390
391
392
393
394
395 public void setMaxDeviation(float maxDeviation, float slowDownFactor, float accelerationFactor) {
396 if (maxDeviation > MAX_DEVIATION) {
397 throw new IllegalArgumentException("maxDeviation must be <= " + MAX_DEVIATION);
398 }
399 if (slowDownFactor < 0) {
400 throw new IllegalArgumentException("slowDownFactor must be >= 0");
401 }
402 if (accelerationFactor > 0) {
403 throw new IllegalArgumentException("accelerationFactor must be <= 0");
404 }
405 this.maxDeviation = maxDeviation;
406 this.accelerationFactor = 1 + accelerationFactor;
407 this.slowDownFactor = 1 + slowDownFactor;
408 }
409
410 private void computeDeviationCumulativeBytes() {
411
412 long maxWrittenBytes = 0;
413 long maxReadBytes = 0;
414 long minWrittenBytes = Long.MAX_VALUE;
415 long minReadBytes = Long.MAX_VALUE;
416 for (PerChannel perChannel : channelQueues.values()) {
417 long value = perChannel.channelTrafficCounter.getCumulativeWrittenBytes();
418 if (maxWrittenBytes < value) {
419 maxWrittenBytes = value;
420 }
421 if (minWrittenBytes > value) {
422 minWrittenBytes = value;
423 }
424 value = perChannel.channelTrafficCounter.getCumulativeReadBytes();
425 if (maxReadBytes < value) {
426 maxReadBytes = value;
427 }
428 if (minReadBytes > value) {
429 minReadBytes = value;
430 }
431 }
432 boolean multiple = channelQueues.size() > 1;
433 readDeviationActive = multiple && minReadBytes < maxReadBytes / 2;
434 writeDeviationActive = multiple && minWrittenBytes < maxWrittenBytes / 2;
435 cumulativeWrittenBytes.set(maxWrittenBytes);
436 cumulativeReadBytes.set(maxReadBytes);
437 }
438
439 @Override
440 protected void doAccounting(TrafficCounter counter) {
441 computeDeviationCumulativeBytes();
442 super.doAccounting(counter);
443 }
444
445 private long computeBalancedWait(float maxLocal, float maxGlobal, long wait) {
446 if (maxGlobal == 0) {
447
448 return wait;
449 }
450 float ratio = maxLocal / maxGlobal;
451
452 if (ratio > maxDeviation) {
453 if (ratio < 1 - maxDeviation) {
454 return wait;
455 } else {
456 ratio = slowDownFactor;
457 if (wait < MINIMAL_WAIT) {
458 wait = MINIMAL_WAIT;
459 }
460 }
461 } else {
462 ratio = accelerationFactor;
463 }
464 return (long) (wait * ratio);
465 }
466
467
468
469
470 public long getMaxGlobalWriteSize() {
471 return maxGlobalWriteSize;
472 }
473
474
475
476
477
478
479
480
481
482
483
484 public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
485 this.maxGlobalWriteSize = maxGlobalWriteSize;
486 }
487
488
489
490
491 public long queuesSize() {
492 return queuesSize.get();
493 }
494
495
496
497
498
499 public void configureChannel(long newWriteLimit, long newReadLimit) {
500 writeChannelLimit = newWriteLimit;
501 readChannelLimit = newReadLimit;
502 long now = TrafficCounter.milliSecondFromNano();
503 for (PerChannel perChannel : channelQueues.values()) {
504 perChannel.channelTrafficCounter.resetAccounting(now);
505 }
506 }
507
508
509
510
511 public long getWriteChannelLimit() {
512 return writeChannelLimit;
513 }
514
515
516
517
518 public void setWriteChannelLimit(long writeLimit) {
519 writeChannelLimit = writeLimit;
520 long now = TrafficCounter.milliSecondFromNano();
521 for (PerChannel perChannel : channelQueues.values()) {
522 perChannel.channelTrafficCounter.resetAccounting(now);
523 }
524 }
525
526
527
528
529 public long getReadChannelLimit() {
530 return readChannelLimit;
531 }
532
533
534
535
536 public void setReadChannelLimit(long readLimit) {
537 readChannelLimit = readLimit;
538 long now = TrafficCounter.milliSecondFromNano();
539 for (PerChannel perChannel : channelQueues.values()) {
540 perChannel.channelTrafficCounter.resetAccounting(now);
541 }
542 }
543
544
545
546
547 public final void release() {
548 trafficCounter.stop();
549 }
550
551 private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
552
553 Channel channel = ctx.getChannel();
554 Integer key = channel.hashCode();
555 PerChannel perChannel = channelQueues.get(key);
556 if (perChannel == null) {
557 perChannel = new PerChannel();
558 perChannel.messagesQueue = new LinkedList<ToSend>();
559
560 perChannel.channelTrafficCounter = new TrafficCounter(this, null, "ChannelTC" +
561 ctx.getChannel().hashCode(), checkInterval);
562 perChannel.queueSize = 0L;
563 perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
564 perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
565 channelQueues.put(key, perChannel);
566 }
567 return perChannel;
568 }
569
570 @Override
571 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
572 getOrSetPerChannel(ctx);
573 trafficCounter.resetCumulativeTime();
574 super.channelConnected(ctx, e);
575 }
576
577 @Override
578 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
579 trafficCounter.resetCumulativeTime();
580 Channel channel = ctx.getChannel();
581 Integer key = channel.hashCode();
582 PerChannel perChannel = channelQueues.remove(key);
583 if (perChannel != null) {
584
585 synchronized (perChannel) {
586 queuesSize.addAndGet(-perChannel.queueSize);
587 perChannel.messagesQueue.clear();
588 }
589 }
590 releaseWriteSuspended(ctx);
591 releaseReadSuspended(ctx);
592 super.channelClosed(ctx, e);
593 }
594
595 @Override
596 public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
597 throws Exception {
598 long now = TrafficCounter.milliSecondFromNano();
599 try {
600 ReadWriteStatus rws = checkAttachment(ctx);
601 long size = calculateSize(evt.getMessage());
602 if (size > 0) {
603
604
605 long waitGlobal = trafficCounter.readTimeToWait(size, getReadLimit(), maxTime, now);
606 Integer key = ctx.getChannel().hashCode();
607 PerChannel perChannel = channelQueues.get(key);
608 long wait = 0;
609 if (perChannel != null) {
610 wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
611 if (readDeviationActive) {
612
613 long maxLocalRead;
614 maxLocalRead = perChannel.channelTrafficCounter.getCumulativeReadBytes();
615 long maxGlobalRead = cumulativeReadBytes.get();
616 if (maxLocalRead <= 0) {
617 maxLocalRead = 0;
618 }
619 if (maxGlobalRead < maxLocalRead) {
620 maxGlobalRead = maxLocalRead;
621 }
622 wait = computeBalancedWait(maxLocalRead, maxGlobalRead, wait);
623 }
624 }
625 if (wait < waitGlobal) {
626 wait = waitGlobal;
627 }
628 wait = checkWaitReadTime(ctx, wait, now);
629 if (wait >= MINIMAL_WAIT) {
630
631 if (release.get()) {
632 return;
633 }
634 Channel channel = ctx.getChannel();
635 if (channel != null && channel.isConnected()) {
636
637 if (logger.isDebugEnabled()) {
638 logger.debug("Read suspend: " + wait + ':' + channel.isReadable() + ':' +
639 rws.readSuspend);
640 }
641 if (timer == null) {
642
643
644 Thread.sleep(wait);
645 return;
646 }
647 if (channel.isReadable() && ! rws.readSuspend) {
648 rws.readSuspend = true;
649 channel.setReadable(false);
650 if (logger.isDebugEnabled()) {
651 logger.debug("Suspend final status => " + channel.isReadable() + ':' +
652 rws.readSuspend);
653 }
654
655
656 if (rws.reopenReadTimerTask == null) {
657 rws.reopenReadTimerTask = new ReopenReadTimerTask(ctx);
658 }
659 timeout = timer.newTimeout(rws.reopenReadTimerTask, wait,
660 TimeUnit.MILLISECONDS);
661 }
662 }
663 }
664 }
665 } finally {
666 informReadOperation(ctx, now);
667
668 ctx.sendUpstream(evt);
669 }
670 }
671
672 @Override
673 protected long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
674 Integer key = ctx.getChannel().hashCode();
675 PerChannel perChannel = channelQueues.get(key);
676 if (perChannel != null) {
677 if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
678 wait = maxTime;
679 }
680 }
681 return wait;
682 }
683
684 @Override
685 protected void informReadOperation(final ChannelHandlerContext ctx, final long now) {
686 Integer key = ctx.getChannel().hashCode();
687 PerChannel perChannel = channelQueues.get(key);
688 if (perChannel != null) {
689 perChannel.lastReadTimestamp = now;
690 }
691 }
692
693 private static final class ToSend {
694 final long relativeTimeAction;
695 final MessageEvent toSend;
696 final long size;
697
698 private ToSend(final long delay, final MessageEvent toSend, final long size) {
699 relativeTimeAction = delay;
700 this.toSend = toSend;
701 this.size = size;
702 }
703 }
704
705 protected long maximumCumulativeWrittenBytes() {
706 return cumulativeWrittenBytes.get();
707 }
708
709 protected long maximumCumulativeReadBytes() {
710 return cumulativeReadBytes.get();
711 }
712
713
714
715
716
717 public Collection<TrafficCounter> channelTrafficCounters() {
718 return new AbstractCollection<TrafficCounter>() {
719 @Override
720 public Iterator<TrafficCounter> iterator() {
721 return new Iterator<TrafficCounter>() {
722 final Iterator<PerChannel> iter = channelQueues.values().iterator();
723 public boolean hasNext() {
724 return iter.hasNext();
725 }
726 public TrafficCounter next() {
727 return iter.next().channelTrafficCounter;
728 }
729 public void remove() {
730 throw new UnsupportedOperationException();
731 }
732 };
733 }
734 @Override
735 public int size() {
736 return channelQueues.size();
737 }
738 };
739 }
740
741 @Override
742 public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
743 throws Exception {
744 long wait = 0;
745 long size = calculateSize(evt.getMessage());
746 long now = TrafficCounter.milliSecondFromNano();
747 try {
748 if (size > 0) {
749
750 long waitGlobal = trafficCounter.writeTimeToWait(size, getWriteLimit(), maxTime, now);
751 Integer key = ctx.getChannel().hashCode();
752 PerChannel perChannel = channelQueues.get(key);
753 if (perChannel != null) {
754 wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now);
755 if (writeDeviationActive) {
756
757 long maxLocalWrite;
758 maxLocalWrite = perChannel.channelTrafficCounter.getCumulativeWrittenBytes();
759 long maxGlobalWrite = cumulativeWrittenBytes.get();
760 if (maxLocalWrite <= 0) {
761 maxLocalWrite = 0;
762 }
763 if (maxGlobalWrite < maxLocalWrite) {
764 maxGlobalWrite = maxLocalWrite;
765 }
766 wait = computeBalancedWait(maxLocalWrite, maxGlobalWrite, wait);
767 }
768 }
769 if (wait < waitGlobal) {
770 wait = waitGlobal;
771 }
772 if (wait < MINIMAL_WAIT || release.get()) {
773 wait = 0;
774 }
775 }
776 } finally {
777
778 submitWrite(ctx, evt, size, wait, now);
779 }
780 }
781
782 @Override
783 protected void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt,
784 final long size, final long writedelay, final long now) throws Exception {
785 Channel channel = ctx.getChannel();
786 Integer key = channel.hashCode();
787 PerChannel perChannel = channelQueues.get(key);
788 if (perChannel == null) {
789
790
791 perChannel = getOrSetPerChannel(ctx);
792 }
793 final ToSend newToSend;
794 long delay = writedelay;
795 boolean globalSizeExceeded = false;
796
797 synchronized (perChannel) {
798 if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
799 if (!channel.isConnected()) {
800
801 return;
802 }
803 trafficCounter.bytesRealWriteFlowControl(size);
804 perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
805 ctx.sendDownstream(evt);
806 perChannel.lastWriteTimestamp = now;
807 return;
808 }
809 if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
810 delay = maxTime;
811 }
812 if (timer == null) {
813
814 Thread.sleep(delay);
815 if (!ctx.getChannel().isConnected()) {
816
817 return;
818 }
819 trafficCounter.bytesRealWriteFlowControl(size);
820 perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
821 ctx.sendDownstream(evt);
822 perChannel.lastWriteTimestamp = now;
823 return;
824 }
825 if (!ctx.getChannel().isConnected()) {
826
827 return;
828 }
829 newToSend = new ToSend(delay + now, evt, size);
830 perChannel.messagesQueue.add(newToSend);
831 perChannel.queueSize += size;
832 queuesSize.addAndGet(size);
833 checkWriteSuspend(ctx, delay, perChannel.queueSize);
834 if (queuesSize.get() > maxGlobalWriteSize) {
835 globalSizeExceeded = true;
836 }
837 }
838 if (globalSizeExceeded) {
839 setWritable(ctx, false);
840 }
841 final long futureNow = newToSend.relativeTimeAction;
842 final PerChannel forSchedule = perChannel;
843 timer.newTimeout(new TimerTask() {
844 public void run(Timeout timeout) throws Exception {
845 sendAllValid(ctx, forSchedule, futureNow);
846 }
847 }, delay, TimeUnit.MILLISECONDS);
848 }
849
850 private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now)
851 throws Exception {
852
853 synchronized (perChannel) {
854 while (!perChannel.messagesQueue.isEmpty()) {
855 ToSend newToSend = perChannel.messagesQueue.remove(0);
856 if (newToSend.relativeTimeAction <= now) {
857 if (! ctx.getChannel().isConnected()) {
858
859 break;
860 }
861 long size = newToSend.size;
862 trafficCounter.bytesRealWriteFlowControl(size);
863 perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
864 perChannel.queueSize -= size;
865 queuesSize.addAndGet(-size);
866 ctx.sendDownstream(newToSend.toSend);
867 perChannel.lastWriteTimestamp = now;
868 } else {
869 perChannel.messagesQueue.add(0, newToSend);
870 break;
871 }
872 }
873 if (perChannel.messagesQueue.isEmpty()) {
874 releaseWriteSuspended(ctx);
875 }
876 }
877 }
878
879 @Override
880 public String toString() {
881 return new StringBuilder(340).append(super.toString())
882 .append(" Write Channel Limit: ").append(writeChannelLimit)
883 .append(" Read Channel Limit: ").append(readChannelLimit).toString();
884 }
885 }