1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.traffic;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelHandler.Sharable;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.util.Attribute;
25 import io.netty.util.concurrent.EventExecutor;
26 import io.netty.util.internal.PlatformDependent;
27 import io.netty.util.internal.logging.InternalLogger;
28 import io.netty.util.internal.logging.InternalLoggerFactory;
29
30 import java.util.AbstractCollection;
31 import java.util.ArrayDeque;
32 import java.util.Collection;
33 import java.util.Iterator;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicLong;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 @Sharable
87 public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
88 private static final InternalLogger logger =
89 InternalLoggerFactory.getInstance(GlobalChannelTrafficShapingHandler.class);
90
91
92
93 final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
94
95
96
97
98 private final AtomicLong queuesSize = new AtomicLong();
99
100
101
102
103 private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
104
105
106
107
108 private final AtomicLong cumulativeReadBytes = new AtomicLong();
109
110
111
112
113
114 volatile long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100;
115
116
117
118
119 private volatile long writeChannelLimit;
120
121
122
123
124 private volatile long readChannelLimit;
125
126 private static final float DEFAULT_DEVIATION = 0.1F;
127 private static final float MAX_DEVIATION = 0.4F;
128 private static final float DEFAULT_SLOWDOWN = 0.4F;
129 private static final float DEFAULT_ACCELERATION = -0.1F;
130 private volatile float maxDeviation;
131 private volatile float accelerationFactor;
132 private volatile float slowDownFactor;
133 private volatile boolean readDeviationActive;
134 private volatile boolean writeDeviationActive;
135
136 static final class PerChannel {
137 ArrayDeque<ToSend> messagesQueue;
138 TrafficCounter channelTrafficCounter;
139 long queueSize;
140 long lastWriteTimestamp;
141 long lastReadTimestamp;
142 }
143
144
145
146
147 void createGlobalTrafficCounter(ScheduledExecutorService executor) {
148
149 setMaxDeviation(DEFAULT_DEVIATION, DEFAULT_SLOWDOWN, DEFAULT_ACCELERATION);
150 if (executor == null) {
151 throw new IllegalArgumentException("Executor must not be null");
152 }
153 TrafficCounter tc = new GlobalChannelTrafficCounter(this, executor, "GlobalChannelTC", checkInterval);
154 setTrafficCounter(tc);
155 tc.start();
156 }
157
158 @Override
159 protected int userDefinedWritabilityIndex() {
160 return AbstractTrafficShapingHandler.GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
161 }
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
183 long writeGlobalLimit, long readGlobalLimit,
184 long writeChannelLimit, long readChannelLimit,
185 long checkInterval, long maxTime) {
186 super(writeGlobalLimit, readGlobalLimit, checkInterval, maxTime);
187 createGlobalTrafficCounter(executor);
188 this.writeChannelLimit = writeChannelLimit;
189 this.readChannelLimit = readChannelLimit;
190 }
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209 public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
210 long writeGlobalLimit, long readGlobalLimit,
211 long writeChannelLimit, long readChannelLimit,
212 long checkInterval) {
213 super(writeGlobalLimit, readGlobalLimit, checkInterval);
214 this.writeChannelLimit = writeChannelLimit;
215 this.readChannelLimit = readChannelLimit;
216 createGlobalTrafficCounter(executor);
217 }
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233 public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
234 long writeGlobalLimit, long readGlobalLimit,
235 long writeChannelLimit, long readChannelLimit) {
236 super(writeGlobalLimit, readGlobalLimit);
237 this.writeChannelLimit = writeChannelLimit;
238 this.readChannelLimit = readChannelLimit;
239 createGlobalTrafficCounter(executor);
240 }
241
242
243
244
245
246
247
248
249
250
251 public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) {
252 super(checkInterval);
253 createGlobalTrafficCounter(executor);
254 }
255
256
257
258
259
260
261
262 public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor) {
263 createGlobalTrafficCounter(executor);
264 }
265
266
267
268
269 public float maxDeviation() {
270 return maxDeviation;
271 }
272
273
274
275
276 public float accelerationFactor() {
277 return accelerationFactor;
278 }
279
280
281
282
283 public float slowDownFactor() {
284 return slowDownFactor;
285 }
286
287
288
289
290
291
292
293
294
295
296
297
298 public void setMaxDeviation(float maxDeviation, float slowDownFactor, float accelerationFactor) {
299 if (maxDeviation > MAX_DEVIATION) {
300 throw new IllegalArgumentException("maxDeviation must be <= " + MAX_DEVIATION);
301 }
302 if (slowDownFactor < 0) {
303 throw new IllegalArgumentException("slowDownFactor must be >= 0");
304 }
305 if (accelerationFactor > 0) {
306 throw new IllegalArgumentException("accelerationFactor must be <= 0");
307 }
308 this.maxDeviation = maxDeviation;
309 this.accelerationFactor = 1 + accelerationFactor;
310 this.slowDownFactor = 1 + slowDownFactor;
311 }
312
313 private void computeDeviationCumulativeBytes() {
314
315 long maxWrittenBytes = 0;
316 long maxReadBytes = 0;
317 long minWrittenBytes = Long.MAX_VALUE;
318 long minReadBytes = Long.MAX_VALUE;
319 for (PerChannel perChannel : channelQueues.values()) {
320 long value = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
321 if (maxWrittenBytes < value) {
322 maxWrittenBytes = value;
323 }
324 if (minWrittenBytes > value) {
325 minWrittenBytes = value;
326 }
327 value = perChannel.channelTrafficCounter.cumulativeReadBytes();
328 if (maxReadBytes < value) {
329 maxReadBytes = value;
330 }
331 if (minReadBytes > value) {
332 minReadBytes = value;
333 }
334 }
335 boolean multiple = channelQueues.size() > 1;
336 readDeviationActive = multiple && minReadBytes < maxReadBytes / 2;
337 writeDeviationActive = multiple && minWrittenBytes < maxWrittenBytes / 2;
338 cumulativeWrittenBytes.set(maxWrittenBytes);
339 cumulativeReadBytes.set(maxReadBytes);
340 }
341
342 @Override
343 protected void doAccounting(TrafficCounter counter) {
344 computeDeviationCumulativeBytes();
345 super.doAccounting(counter);
346 }
347
348 private long computeBalancedWait(float maxLocal, float maxGlobal, long wait) {
349 if (maxGlobal == 0) {
350
351 return wait;
352 }
353 float ratio = maxLocal / maxGlobal;
354
355 if (ratio > maxDeviation) {
356 if (ratio < 1 - maxDeviation) {
357 return wait;
358 } else {
359 ratio = slowDownFactor;
360 if (wait < MINIMAL_WAIT) {
361 wait = MINIMAL_WAIT;
362 }
363 }
364 } else {
365 ratio = accelerationFactor;
366 }
367 return (long) (wait * ratio);
368 }
369
370
371
372
373 public long getMaxGlobalWriteSize() {
374 return maxGlobalWriteSize;
375 }
376
377
378
379
380
381
382
383
384
385
386
387 public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
388 if (maxGlobalWriteSize <= 0) {
389 throw new IllegalArgumentException("maxGlobalWriteSize must be positive");
390 }
391 this.maxGlobalWriteSize = maxGlobalWriteSize;
392 }
393
394
395
396
397 public long queuesSize() {
398 return queuesSize.get();
399 }
400
401
402
403
404
405 public void configureChannel(long newWriteLimit, long newReadLimit) {
406 writeChannelLimit = newWriteLimit;
407 readChannelLimit = newReadLimit;
408 long now = TrafficCounter.milliSecondFromNano();
409 for (PerChannel perChannel : channelQueues.values()) {
410 perChannel.channelTrafficCounter.resetAccounting(now);
411 }
412 }
413
414
415
416
417 public long getWriteChannelLimit() {
418 return writeChannelLimit;
419 }
420
421
422
423
424 public void setWriteChannelLimit(long writeLimit) {
425 writeChannelLimit = writeLimit;
426 long now = TrafficCounter.milliSecondFromNano();
427 for (PerChannel perChannel : channelQueues.values()) {
428 perChannel.channelTrafficCounter.resetAccounting(now);
429 }
430 }
431
432
433
434
435 public long getReadChannelLimit() {
436 return readChannelLimit;
437 }
438
439
440
441
442 public void setReadChannelLimit(long readLimit) {
443 readChannelLimit = readLimit;
444 long now = TrafficCounter.milliSecondFromNano();
445 for (PerChannel perChannel : channelQueues.values()) {
446 perChannel.channelTrafficCounter.resetAccounting(now);
447 }
448 }
449
450
451
452
453 public final void release() {
454 trafficCounter.stop();
455 }
456
457 private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
458
459 Channel channel = ctx.channel();
460 Integer key = channel.hashCode();
461 PerChannel perChannel = channelQueues.get(key);
462 if (perChannel == null) {
463 perChannel = new PerChannel();
464 perChannel.messagesQueue = new ArrayDeque<ToSend>();
465
466 perChannel.channelTrafficCounter = new TrafficCounter(this, null, "ChannelTC" +
467 ctx.channel().hashCode(), checkInterval);
468 perChannel.queueSize = 0L;
469 perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
470 perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
471 channelQueues.put(key, perChannel);
472 }
473 return perChannel;
474 }
475
476 @Override
477 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
478 getOrSetPerChannel(ctx);
479 trafficCounter.resetCumulativeTime();
480 super.handlerAdded(ctx);
481 }
482
483 @Override
484 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
485 trafficCounter.resetCumulativeTime();
486 Channel channel = ctx.channel();
487 Integer key = channel.hashCode();
488 PerChannel perChannel = channelQueues.remove(key);
489 if (perChannel != null) {
490
491 synchronized (perChannel) {
492 if (channel.isActive()) {
493 for (ToSend toSend : perChannel.messagesQueue) {
494 long size = calculateSize(toSend.toSend);
495 trafficCounter.bytesRealWriteFlowControl(size);
496 perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
497 perChannel.queueSize -= size;
498 queuesSize.addAndGet(-size);
499 ctx.write(toSend.toSend, toSend.promise);
500 }
501 } else {
502 queuesSize.addAndGet(-perChannel.queueSize);
503 for (ToSend toSend : perChannel.messagesQueue) {
504 if (toSend.toSend instanceof ByteBuf) {
505 ((ByteBuf) toSend.toSend).release();
506 }
507 }
508 }
509 perChannel.messagesQueue.clear();
510 }
511 }
512 releaseWriteSuspended(ctx);
513 releaseReadSuspended(ctx);
514 super.handlerRemoved(ctx);
515 }
516
517 @Override
518 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
519 long size = calculateSize(msg);
520 long now = TrafficCounter.milliSecondFromNano();
521 if (size > 0) {
522
523 long waitGlobal = trafficCounter.readTimeToWait(size, getReadLimit(), maxTime, now);
524 Integer key = ctx.channel().hashCode();
525 PerChannel perChannel = channelQueues.get(key);
526 long wait = 0;
527 if (perChannel != null) {
528 wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
529 if (readDeviationActive) {
530
531 long maxLocalRead;
532 maxLocalRead = perChannel.channelTrafficCounter.cumulativeReadBytes();
533 long maxGlobalRead = cumulativeReadBytes.get();
534 if (maxLocalRead <= 0) {
535 maxLocalRead = 0;
536 }
537 if (maxGlobalRead < maxLocalRead) {
538 maxGlobalRead = maxLocalRead;
539 }
540 wait = computeBalancedWait(maxLocalRead, maxGlobalRead, wait);
541 }
542 }
543 if (wait < waitGlobal) {
544 wait = waitGlobal;
545 }
546 wait = checkWaitReadTime(ctx, wait, now);
547 if (wait >= MINIMAL_WAIT) {
548
549
550 ChannelConfig config = ctx.channel().config();
551 if (logger.isDebugEnabled()) {
552 logger.debug("Read Suspend: " + wait + ':' + config.isAutoRead() + ':'
553 + isHandlerActive(ctx));
554 }
555 if (config.isAutoRead() && isHandlerActive(ctx)) {
556 config.setAutoRead(false);
557 ctx.attr(READ_SUSPENDED).set(true);
558
559
560 Attribute<Runnable> attr = ctx.attr(REOPEN_TASK);
561 Runnable reopenTask = attr.get();
562 if (reopenTask == null) {
563 reopenTask = new ReopenReadTimerTask(ctx);
564 attr.set(reopenTask);
565 }
566 ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
567 if (logger.isDebugEnabled()) {
568 logger.debug("Suspend final status => " + config.isAutoRead() + ':'
569 + isHandlerActive(ctx) + " will reopened at: " + wait);
570 }
571 }
572 }
573 }
574 informReadOperation(ctx, now);
575 ctx.fireChannelRead(msg);
576 }
577
578 @Override
579 protected long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
580 Integer key = ctx.channel().hashCode();
581 PerChannel perChannel = channelQueues.get(key);
582 if (perChannel != null) {
583 if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
584 wait = maxTime;
585 }
586 }
587 return wait;
588 }
589
590 @Override
591 protected void informReadOperation(final ChannelHandlerContext ctx, final long now) {
592 Integer key = ctx.channel().hashCode();
593 PerChannel perChannel = channelQueues.get(key);
594 if (perChannel != null) {
595 perChannel.lastReadTimestamp = now;
596 }
597 }
598
599 private static final class ToSend {
600 final long relativeTimeAction;
601 final Object toSend;
602 final ChannelPromise promise;
603 final long size;
604
605 private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
606 relativeTimeAction = delay;
607 this.toSend = toSend;
608 this.size = size;
609 this.promise = promise;
610 }
611 }
612
613 protected long maximumCumulativeWrittenBytes() {
614 return cumulativeWrittenBytes.get();
615 }
616
617 protected long maximumCumulativeReadBytes() {
618 return cumulativeReadBytes.get();
619 }
620
621
622
623
624
625 public Collection<TrafficCounter> channelTrafficCounters() {
626 return new AbstractCollection<TrafficCounter>() {
627 @Override
628 public Iterator<TrafficCounter> iterator() {
629 return new Iterator<TrafficCounter>() {
630 final Iterator<PerChannel> iter = channelQueues.values().iterator();
631 @Override
632 public boolean hasNext() {
633 return iter.hasNext();
634 }
635 @Override
636 public TrafficCounter next() {
637 return iter.next().channelTrafficCounter;
638 }
639 @Override
640 public void remove() {
641 throw new UnsupportedOperationException();
642 }
643 };
644 }
645 @Override
646 public int size() {
647 return channelQueues.size();
648 }
649 };
650 }
651
652 @Override
653 public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
654 throws Exception {
655 long size = calculateSize(msg);
656 long now = TrafficCounter.milliSecondFromNano();
657 if (size > 0) {
658
659 long waitGlobal = trafficCounter.writeTimeToWait(size, getWriteLimit(), maxTime, now);
660 Integer key = ctx.channel().hashCode();
661 PerChannel perChannel = channelQueues.get(key);
662 long wait = 0;
663 if (perChannel != null) {
664 wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now);
665 if (writeDeviationActive) {
666
667 long maxLocalWrite;
668 maxLocalWrite = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
669 long maxGlobalWrite = cumulativeWrittenBytes.get();
670 if (maxLocalWrite <= 0) {
671 maxLocalWrite = 0;
672 }
673 if (maxGlobalWrite < maxLocalWrite) {
674 maxGlobalWrite = maxLocalWrite;
675 }
676 wait = computeBalancedWait(maxLocalWrite, maxGlobalWrite, wait);
677 }
678 }
679 if (wait < waitGlobal) {
680 wait = waitGlobal;
681 }
682 if (wait >= MINIMAL_WAIT) {
683 if (logger.isDebugEnabled()) {
684 logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
685 + isHandlerActive(ctx));
686 }
687 submitWrite(ctx, msg, size, wait, now, promise);
688 return;
689 }
690 }
691
692 submitWrite(ctx, msg, size, 0, now, promise);
693 }
694
695 @Override
696 protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
697 final long size, final long writedelay, final long now,
698 final ChannelPromise promise) {
699 Channel channel = ctx.channel();
700 Integer key = channel.hashCode();
701 PerChannel perChannel = channelQueues.get(key);
702 if (perChannel == null) {
703
704
705 perChannel = getOrSetPerChannel(ctx);
706 }
707 final ToSend newToSend;
708 long delay = writedelay;
709 boolean globalSizeExceeded = false;
710
711 synchronized (perChannel) {
712 if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
713 trafficCounter.bytesRealWriteFlowControl(size);
714 perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
715 ctx.write(msg, promise);
716 perChannel.lastWriteTimestamp = now;
717 return;
718 }
719 if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
720 delay = maxTime;
721 }
722 newToSend = new ToSend(delay + now, msg, size, promise);
723 perChannel.messagesQueue.addLast(newToSend);
724 perChannel.queueSize += size;
725 queuesSize.addAndGet(size);
726 checkWriteSuspend(ctx, delay, perChannel.queueSize);
727 if (queuesSize.get() > maxGlobalWriteSize) {
728 globalSizeExceeded = true;
729 }
730 }
731 if (globalSizeExceeded) {
732 setUserDefinedWritability(ctx, false);
733 }
734 final long futureNow = newToSend.relativeTimeAction;
735 final PerChannel forSchedule = perChannel;
736 ctx.executor().schedule(new Runnable() {
737 @Override
738 public void run() {
739 sendAllValid(ctx, forSchedule, futureNow);
740 }
741 }, delay, TimeUnit.MILLISECONDS);
742 }
743
744 private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
745
746 synchronized (perChannel) {
747 ToSend newToSend = perChannel.messagesQueue.pollFirst();
748 for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
749 if (newToSend.relativeTimeAction <= now) {
750 long size = newToSend.size;
751 trafficCounter.bytesRealWriteFlowControl(size);
752 perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
753 perChannel.queueSize -= size;
754 queuesSize.addAndGet(-size);
755 ctx.write(newToSend.toSend, newToSend.promise);
756 perChannel.lastWriteTimestamp = now;
757 } else {
758 perChannel.messagesQueue.addFirst(newToSend);
759 break;
760 }
761 }
762 if (perChannel.messagesQueue.isEmpty()) {
763 releaseWriteSuspended(ctx);
764 }
765 }
766 ctx.flush();
767 }
768
769 @Override
770 public String toString() {
771 return new StringBuilder(340).append(super.toString())
772 .append(" Write Channel Limit: ").append(writeChannelLimit)
773 .append(" Read Channel Limit: ").append(readChannelLimit).toString();
774 }
775 }