1 /*
2 * Copyright 2011 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.traffic;
17
18 import static io.netty.util.internal.ObjectUtil.checkPositive;
19
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.ByteBufHolder;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelDuplexHandler;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.ChannelOutboundBuffer;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.FileRegion;
29 import io.netty.util.Attribute;
30 import io.netty.util.AttributeKey;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.util.concurrent.TimeUnit;
35
36 /**
37 * <p>AbstractTrafficShapingHandler allows to limit the global bandwidth
38 * (see {@link GlobalTrafficShapingHandler}) or per session
39 * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
40 * It allows you to implement an almost real time monitoring of the bandwidth using
41 * the monitors from {@link TrafficCounter} that will call back every checkInterval
42 * the method doAccounting of this handler.</p>
43 *
44 * <p>If you want for any particular reasons to stop the monitoring (accounting) or to change
45 * the read/write limit or the check interval, several methods allow that for you:</p>
46 * <ul>
47 * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
48 * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
49 * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
50 * </ul>
51 */
52 public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler {
53 private static final InternalLogger logger =
54 InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class);
55 /**
56 * Default delay between two checks: 1s
57 */
58 public static final long DEFAULT_CHECK_INTERVAL = 1000;
59
60 /**
61 * Default max delay in case of traffic shaping
62 * (during which no communication will occur).
63 * Shall be less than TIMEOUT. Here half of "standard" 30s
64 */
65 public static final long DEFAULT_MAX_TIME = 15000;
66
67 /**
68 * Default max size to not exceed in buffer (write only).
69 */
70 static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
71
72 /**
73 * Default minimal time to wait: 10ms
74 */
75 static final long MINIMAL_WAIT = 10;
76
77 /**
78 * Traffic Counter
79 */
80 protected TrafficCounter trafficCounter;
81
82 /**
83 * Limit in B/s to apply to write
84 */
85 private volatile long writeLimit;
86
87 /**
88 * Limit in B/s to apply to read
89 */
90 private volatile long readLimit;
91
92 /**
93 * Max delay in wait
94 */
95 protected volatile long maxTime = DEFAULT_MAX_TIME; // default 15 s
96
97 /**
98 * Delay between two performance snapshots
99 */
100 protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
101
102 static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey
103 .valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED");
104 static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class
105 .getName() + ".REOPEN_TASK");
106
107 /**
108 * Max time to delay before proposing to stop writing new objects from next handlers
109 */
110 volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL; // default 4 s
111 /**
112 * Max size in the list before proposing to stop writing new objects from next handlers
113 */
114 volatile long maxWriteSize = DEFAULT_MAX_SIZE; // default 4MB
115
116 /**
117 * Rank in UserDefinedWritability (1 for Channel, 2 for Global TrafficShapingHandler).
118 * Set in final constructor. Must be between 1 and 31
119 */
120 final int userDefinedWritabilityIndex;
121
122 /**
123 * Default value for Channel UserDefinedWritability index
124 */
125 static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
126
127 /**
128 * Default value for Global UserDefinedWritability index
129 */
130 static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
131
132 /**
133 * Default value for GlobalChannel UserDefinedWritability index
134 */
135 static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
136
137 /**
138 * @param newTrafficCounter
139 * the TrafficCounter to set
140 */
141 void setTrafficCounter(TrafficCounter newTrafficCounter) {
142 trafficCounter = newTrafficCounter;
143 }
144
145 /**
146 * @return the index to be used by the TrafficShapingHandler to manage the user defined writability.
147 * For Channel TSH it is defined as {@value #CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
148 * for Global TSH it is defined as {@value #GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
149 * for GlobalChannel TSH it is defined as
150 * {@value #GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}.
151 */
152 protected int userDefinedWritabilityIndex() {
153 return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
154 }
155
156 /**
157 * @param writeLimit
158 * 0 or a limit in bytes/s
159 * @param readLimit
160 * 0 or a limit in bytes/s
161 * @param checkInterval
162 * The delay between two computations of performances for
163 * channels or 0 if no stats are to be computed.
164 * @param maxTime
165 * The maximum delay to wait in case of traffic excess.
166 * Must be positive.
167 */
168 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
169 this.maxTime = checkPositive(maxTime, "maxTime");
170
171 userDefinedWritabilityIndex = userDefinedWritabilityIndex();
172 this.writeLimit = writeLimit;
173 this.readLimit = readLimit;
174 this.checkInterval = checkInterval;
175 }
176
177 /**
178 * Constructor using default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
179 * @param writeLimit
180 * 0 or a limit in bytes/s
181 * @param readLimit
182 * 0 or a limit in bytes/s
183 * @param checkInterval
184 * The delay between two computations of performances for
185 * channels or 0 if no stats are to be computed.
186 */
187 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) {
188 this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
189 }
190
191 /**
192 * Constructor using default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
193 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
194 *
195 * @param writeLimit
196 * 0 or a limit in bytes/s
197 * @param readLimit
198 * 0 or a limit in bytes/s
199 */
200 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) {
201 this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
202 }
203
204 /**
205 * Constructor using NO LIMIT, default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
206 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
207 */
208 protected AbstractTrafficShapingHandler() {
209 this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
210 }
211
212 /**
213 * Constructor using NO LIMIT and
214 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
215 *
216 * @param checkInterval
217 * The delay between two computations of performances for
218 * channels or 0 if no stats are to be computed.
219 */
220 protected AbstractTrafficShapingHandler(long checkInterval) {
221 this(0, 0, checkInterval, DEFAULT_MAX_TIME);
222 }
223
224 /**
225 * Change the underlying limitations and check interval.
226 * <p>Note the change will be taken as best effort, meaning
227 * that all already scheduled traffics will not be
228 * changed, but only applied to new traffics.</p>
229 * <p>So the expected usage of this method is to be used not too often,
230 * accordingly to the traffic shaping configuration.</p>
231 *
232 * @param newWriteLimit The new write limit (in bytes)
233 * @param newReadLimit The new read limit (in bytes)
234 * @param newCheckInterval The new check interval (in milliseconds)
235 */
236 public void configure(long newWriteLimit, long newReadLimit,
237 long newCheckInterval) {
238 configure(newWriteLimit, newReadLimit);
239 configure(newCheckInterval);
240 }
241
242 /**
243 * Change the underlying limitations.
244 * <p>Note the change will be taken as best effort, meaning
245 * that all already scheduled traffics will not be
246 * changed, but only applied to new traffics.</p>
247 * <p>So the expected usage of this method is to be used not too often,
248 * accordingly to the traffic shaping configuration.</p>
249 *
250 * @param newWriteLimit The new write limit (in bytes)
251 * @param newReadLimit The new read limit (in bytes)
252 */
253 public void configure(long newWriteLimit, long newReadLimit) {
254 writeLimit = newWriteLimit;
255 readLimit = newReadLimit;
256 if (trafficCounter != null) {
257 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
258 }
259 }
260
261 /**
262 * Change the check interval.
263 *
264 * @param newCheckInterval The new check interval (in milliseconds)
265 */
266 public void configure(long newCheckInterval) {
267 checkInterval = newCheckInterval;
268 if (trafficCounter != null) {
269 trafficCounter.configure(checkInterval);
270 }
271 }
272
273 /**
274 * @return the writeLimit
275 */
276 public long getWriteLimit() {
277 return writeLimit;
278 }
279
280 /**
281 * <p>Note the change will be taken as best effort, meaning
282 * that all already scheduled traffics will not be
283 * changed, but only applied to new traffics.</p>
284 * <p>So the expected usage of this method is to be used not too often,
285 * accordingly to the traffic shaping configuration.</p>
286 *
287 * @param writeLimit the writeLimit to set
288 */
289 public void setWriteLimit(long writeLimit) {
290 this.writeLimit = writeLimit;
291 if (trafficCounter != null) {
292 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
293 }
294 }
295
296 /**
297 * @return the readLimit
298 */
299 public long getReadLimit() {
300 return readLimit;
301 }
302
303 /**
304 * <p>Note the change will be taken as best effort, meaning
305 * that all already scheduled traffics will not be
306 * changed, but only applied to new traffics.</p>
307 * <p>So the expected usage of this method is to be used not too often,
308 * accordingly to the traffic shaping configuration.</p>
309 *
310 * @param readLimit the readLimit to set
311 */
312 public void setReadLimit(long readLimit) {
313 this.readLimit = readLimit;
314 if (trafficCounter != null) {
315 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
316 }
317 }
318
319 /**
320 * @return the checkInterval
321 */
322 public long getCheckInterval() {
323 return checkInterval;
324 }
325
326 /**
327 * @param checkInterval the interval in ms between each step check to set, default value being 1000 ms.
328 */
329 public void setCheckInterval(long checkInterval) {
330 this.checkInterval = checkInterval;
331 if (trafficCounter != null) {
332 trafficCounter.configure(checkInterval);
333 }
334 }
335
336 /**
337 * <p>Note the change will be taken as best effort, meaning
338 * that all already scheduled traffics will not be
339 * changed, but only applied to new traffics.</p>
340 * <p>So the expected usage of this method is to be used not too often,
341 * accordingly to the traffic shaping configuration.</p>
342 *
343 * @param maxTime
344 * Max delay in wait, shall be less than TIME OUT in related protocol.
345 * Must be positive.
346 */
347 public void setMaxTimeWait(long maxTime) {
348 this.maxTime = checkPositive(maxTime, "maxTime");
349 }
350
351 /**
352 * @return the max delay in wait to prevent TIME OUT
353 */
354 public long getMaxTimeWait() {
355 return maxTime;
356 }
357
358 /**
359 * @return the maxWriteDelay
360 */
361 public long getMaxWriteDelay() {
362 return maxWriteDelay;
363 }
364
365 /**
366 * <p>Note the change will be taken as best effort, meaning
367 * that all already scheduled traffics will not be
368 * changed, but only applied to new traffics.</p>
369 * <p>So the expected usage of this method is to be used not too often,
370 * accordingly to the traffic shaping configuration.</p>
371 *
372 * @param maxWriteDelay the maximum Write Delay in ms in the buffer allowed before write suspension is set.
373 * Must be positive.
374 */
375 public void setMaxWriteDelay(long maxWriteDelay) {
376 this.maxWriteDelay = checkPositive(maxWriteDelay, "maxWriteDelay");
377 }
378
379 /**
380 * @return the maxWriteSize default being {@value #DEFAULT_MAX_SIZE} bytes.
381 */
382 public long getMaxWriteSize() {
383 return maxWriteSize;
384 }
385
386 /**
387 * <p>Note that this limit is a best effort on memory limitation to prevent Out Of
388 * Memory Exception. To ensure it works, the handler generating the write should
389 * use one of the way provided by Netty to handle the capacity:</p>
390 * <p>- the {@code Channel.isWritable()} property and the corresponding
391 * {@code channelWritabilityChanged()}</p>
392 * <p>- the {@code ChannelFuture.addListener(new GenericFutureListener())}</p>
393 *
394 * @param maxWriteSize the maximum Write Size allowed in the buffer
395 * per channel before write suspended is set,
396 * default being {@value #DEFAULT_MAX_SIZE} bytes.
397 */
398 public void setMaxWriteSize(long maxWriteSize) {
399 this.maxWriteSize = maxWriteSize;
400 }
401
402 /**
403 * Called each time the accounting is computed from the TrafficCounters.
404 * This method could be used for instance to implement almost real time accounting.
405 *
406 * @param counter
407 * the TrafficCounter that computes its performance
408 */
409 protected void doAccounting(TrafficCounter counter) {
410 // NOOP by default
411 }
412
413 /**
414 * Class to implement setReadable at fix time
415 */
416 static final class ReopenReadTimerTask implements Runnable {
417 final ChannelHandlerContext ctx;
418 ReopenReadTimerTask(ChannelHandlerContext ctx) {
419 this.ctx = ctx;
420 }
421
422 @Override
423 public void run() {
424 Channel channel = ctx.channel();
425 ChannelConfig config = channel.config();
426 if (!config.isAutoRead() && isHandlerActive(ctx)) {
427 // If AutoRead is False and Active is True, user make a direct setAutoRead(false)
428 // Then Just reset the status
429 if (logger.isDebugEnabled()) {
430 logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
431 isHandlerActive(ctx));
432 }
433 channel.attr(READ_SUSPENDED).set(false);
434 } else {
435 // Anything else allows the handler to reset the AutoRead
436 if (logger.isDebugEnabled()) {
437 if (config.isAutoRead() && !isHandlerActive(ctx)) {
438 if (logger.isDebugEnabled()) {
439 logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
440 isHandlerActive(ctx));
441 }
442 } else {
443 if (logger.isDebugEnabled()) {
444 logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
445 + isHandlerActive(ctx));
446 }
447 }
448 }
449 channel.attr(READ_SUSPENDED).set(false);
450 config.setAutoRead(true);
451 channel.read();
452 }
453 if (logger.isDebugEnabled()) {
454 logger.debug("Unsuspend final status => " + config.isAutoRead() + ':'
455 + isHandlerActive(ctx));
456 }
457 }
458 }
459
460 /**
461 * Release the Read suspension
462 */
463 void releaseReadSuspended(ChannelHandlerContext ctx) {
464 Channel channel = ctx.channel();
465 channel.attr(READ_SUSPENDED).set(false);
466 channel.config().setAutoRead(true);
467 }
468
469 @Override
470 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
471 long size = calculateSize(msg);
472 long now = TrafficCounter.milliSecondFromNano();
473 if (size > 0) {
474 // compute the number of ms to wait before reopening the channel
475 long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
476 wait = checkWaitReadTime(ctx, wait, now);
477 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
478 // time in order to try to limit the traffic
479 // Only AutoRead AND HandlerActive True means Context Active
480 Channel channel = ctx.channel();
481 ChannelConfig config = channel.config();
482 if (logger.isDebugEnabled()) {
483 logger.debug("Read suspend: " + wait + ':' + config.isAutoRead() + ':'
484 + isHandlerActive(ctx));
485 }
486 if (config.isAutoRead() && isHandlerActive(ctx)) {
487 config.setAutoRead(false);
488 channel.attr(READ_SUSPENDED).set(true);
489 // Create a Runnable to reactive the read if needed. If one was create before it will just be
490 // reused to limit object creation
491 Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
492 Runnable reopenTask = attr.get();
493 if (reopenTask == null) {
494 reopenTask = new ReopenReadTimerTask(ctx);
495 attr.set(reopenTask);
496 }
497 ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
498 if (logger.isDebugEnabled()) {
499 logger.debug("Suspend final status => " + config.isAutoRead() + ':'
500 + isHandlerActive(ctx) + " will reopened at: " + wait);
501 }
502 }
503 }
504 }
505 informReadOperation(ctx, now);
506 ctx.fireChannelRead(msg);
507 }
508
509 @Override
510 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
511 Channel channel = ctx.channel();
512 if (channel.hasAttr(REOPEN_TASK)) {
513 //release the reopen task
514 channel.attr(REOPEN_TASK).set(null);
515 }
516 super.handlerRemoved(ctx);
517 }
518
519 /**
520 * Method overridden in GTSH to take into account specific timer for the channel.
521 * @param wait the wait delay computed in ms
522 * @param now the relative now time in ms
523 * @return the wait to use according to the context
524 */
525 long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
526 // no change by default
527 return wait;
528 }
529
530 /**
531 * Method overridden in GTSH to take into account specific timer for the channel.
532 * @param now the relative now time in ms
533 */
534 void informReadOperation(final ChannelHandlerContext ctx, final long now) {
535 // default noop
536 }
537
538 protected static boolean isHandlerActive(ChannelHandlerContext ctx) {
539 Boolean suspended = ctx.channel().attr(READ_SUSPENDED).get();
540 return suspended == null || Boolean.FALSE.equals(suspended);
541 }
542
543 @Override
544 public void read(ChannelHandlerContext ctx) {
545 if (isHandlerActive(ctx)) {
546 // For Global Traffic (and Read when using EventLoop in pipeline) : check if READ_SUSPENDED is False
547 ctx.read();
548 }
549 }
550
551 @Override
552 public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
553 throws Exception {
554 long size = calculateSize(msg);
555 long now = TrafficCounter.milliSecondFromNano();
556 if (size > 0) {
557 // compute the number of ms to wait before continue with the channel
558 long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
559 if (wait >= MINIMAL_WAIT) {
560 if (logger.isDebugEnabled()) {
561 logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
562 + isHandlerActive(ctx));
563 }
564 submitWrite(ctx, msg, size, wait, now, promise);
565 return;
566 }
567 }
568 // to maintain order of write
569 submitWrite(ctx, msg, size, 0, now, promise);
570 }
571
572 @Deprecated
573 protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
574 final long delay, final ChannelPromise promise) {
575 submitWrite(ctx, msg, calculateSize(msg),
576 delay, TrafficCounter.milliSecondFromNano(), promise);
577 }
578
579 abstract void submitWrite(
580 ChannelHandlerContext ctx, Object msg, long size, long delay, long now, ChannelPromise promise);
581
582 @Override
583 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
584 setUserDefinedWritability(ctx, true);
585 super.channelRegistered(ctx);
586 }
587
588 void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
589 ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
590 if (cob != null) {
591 cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable);
592 }
593 }
594
595 /**
596 * Check the writability according to delay and size for the channel.
597 * Set if necessary setUserDefinedWritability status.
598 * @param delay the computed delay
599 * @param queueSize the current queueSize
600 */
601 void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
602 if (queueSize > maxWriteSize || delay > maxWriteDelay) {
603 setUserDefinedWritability(ctx, false);
604 }
605 }
606 /**
607 * Explicitly release the Write suspended status.
608 */
609 void releaseWriteSuspended(ChannelHandlerContext ctx) {
610 setUserDefinedWritability(ctx, true);
611 }
612
613 /**
614 * @return the current TrafficCounter (if
615 * channel is still connected)
616 */
617 public TrafficCounter trafficCounter() {
618 return trafficCounter;
619 }
620
621 @Override
622 public String toString() {
623 StringBuilder builder = new StringBuilder(290)
624 .append("TrafficShaping with Write Limit: ").append(writeLimit)
625 .append(" Read Limit: ").append(readLimit)
626 .append(" CheckInterval: ").append(checkInterval)
627 .append(" maxDelay: ").append(maxWriteDelay)
628 .append(" maxSize: ").append(maxWriteSize)
629 .append(" and Counter: ");
630 if (trafficCounter != null) {
631 builder.append(trafficCounter);
632 } else {
633 builder.append("none");
634 }
635 return builder.toString();
636 }
637
638 /**
639 * Calculate the size of the given {@link Object}.
640 *
641 * This implementation supports {@link ByteBuf}, {@link ByteBufHolder} and {@link FileRegion}.
642 * Sub-classes may override this.
643 * @param msg the msg for which the size should be calculated.
644 * @return size the size of the msg or {@code -1} if unknown.
645 */
646 protected long calculateSize(Object msg) {
647 if (msg instanceof ByteBuf) {
648 return ((ByteBuf) msg).readableBytes();
649 }
650 if (msg instanceof ByteBufHolder) {
651 return ((ByteBufHolder) msg).content().readableBytes();
652 }
653 if (msg instanceof FileRegion) {
654 return ((FileRegion) msg).count();
655 }
656 return -1;
657 }
658 }