1 /*
2 * Copyright 2011 The Netty Project
3 * The Netty Project licenses this file to you under the Apache License,
4 * version 2.0 (the "License"); you may not use this file except in compliance
5 * with the License. You may obtain a copy of the License at:
6 * http://www.apache.org/licenses/LICENSE-2.0
7 * Unless required by applicable law or agreed to in writing, software
8 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10 * License for the specific language governing permissions and limitations
11 * under the License.
12 */
13 package io.netty.handler.traffic;
14
15 import io.netty.buffer.ByteBuf;
16 import io.netty.buffer.ByteBufHolder;
17 import io.netty.channel.ChannelDuplexHandler;
18 import io.netty.channel.ChannelConfig;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.ChannelOutboundBuffer;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.util.Attribute;
23 import io.netty.util.AttributeKey;
24 import io.netty.util.internal.logging.InternalLogger;
25 import io.netty.util.internal.logging.InternalLoggerFactory;
26
27 import java.util.concurrent.TimeUnit;
28
29 /**
30 * <p>AbstractTrafficShapingHandler allows to limit the global bandwidth
31 * (see {@link GlobalTrafficShapingHandler}) or per session
32 * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
33 * It allows you to implement an almost real time monitoring of the bandwidth using
34 * the monitors from {@link TrafficCounter} that will call back every checkInterval
35 * the method doAccounting of this handler.</p>
36 *
37 * <p>If you want for any particular reasons to stop the monitoring (accounting) or to change
38 * the read/write limit or the check interval, several methods allow that for you:</p>
39 * <ul>
40 * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
41 * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop or start the
42 * monitoring, to change the checkInterval directly, or to have access to its values.</li>
43 * </ul>
44 */
45 public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler {
46 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class);
47
48 /**
49 * Default delay between two checks: 1s
50 */
51 public static final long DEFAULT_CHECK_INTERVAL = 1000;
52
53 /**
54 * Default max delay in case of traffic shaping
55 * (during which no communication will occur).
56 * Shall be less than TIMEOUT. Here half of "standard" 30s
57 */
58 public static final long DEFAULT_MAX_TIME = 15000;
59
60 /**
61 * Default max size to not exceed in buffer (write only).
62 */
63 static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
64
65 /**
66 * Default minimal time to wait
67 */
68 static final long MINIMAL_WAIT = 10;
69
70 /**
71 * Traffic Counter
72 */
73 protected TrafficCounter trafficCounter;
74
75 /**
76 * Limit in B/s to apply to write
77 */
78 private volatile long writeLimit;
79
80 /**
81 * Limit in B/s to apply to read
82 */
83 private volatile long readLimit;
84
85 /**
86 * Max delay in wait
87 */
88 protected volatile long maxTime = DEFAULT_MAX_TIME; // default 15 s
89
90 /**
91 * Delay between two performance snapshots
92 */
93 protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
94
95 static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey
96 .valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED");
97 static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class
98 .getName() + ".REOPEN_TASK");
99
100 /**
101 * Max time to delay before proposing to stop writing new objects from next handlers
102 */
103 volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL; // default 4 s
104 /**
105 * Max size in the list before proposing to stop writing new objects from next handlers
106 */
107 volatile long maxWriteSize = DEFAULT_MAX_SIZE; // default 4MB
108
109 /**
110 * Rank in UserDefinedWritability (1 for Channel, 2 for Global TrafficShapingHandler).
111 * Set in final constructor. Must be between 1 and 31
112 */
113 final int userDefinedWritabilityIndex;
114
115 /**
116 * Default value for Channel UserDefinedWritability index
117 */
118 static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
119
120 /**
121 * Default value for Global UserDefinedWritability index
122 */
123 static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
124
125 /**
126 * Default value for GlobalChannel UserDefinedWritability index
127 */
128 static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
129
130 /**
131 * @param newTrafficCounter
132 * the TrafficCounter to set
133 */
134 void setTrafficCounter(TrafficCounter newTrafficCounter) {
135 trafficCounter = newTrafficCounter;
136 }
137
138 /**
139 * @return the index to be used by the TrafficShapingHandler to manage the user defined writability.
140 * For Channel TSH it is defined as {@value #CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
141 * for Global TSH it is defined as {@value #GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
142 * for GlobalChannel TSH it is defined as
143 * {@value #GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}.
144 */
145 protected int userDefinedWritabilityIndex() {
146 return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
147 }
148
149 /**
150 * @param writeLimit
151 * 0 or a limit in bytes/s
152 * @param readLimit
153 * 0 or a limit in bytes/s
154 * @param checkInterval
155 * The delay between two computations of performances for
156 * channels or 0 if no stats are to be computed.
157 * @param maxTime
158 * The maximum delay to wait in case of traffic excess.
159 * Must be positive.
160 */
161 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
162 if (maxTime <= 0) {
163 throw new IllegalArgumentException("maxTime must be positive");
164 }
165
166 userDefinedWritabilityIndex = userDefinedWritabilityIndex();
167 this.writeLimit = writeLimit;
168 this.readLimit = readLimit;
169 this.checkInterval = checkInterval;
170 this.maxTime = maxTime;
171 }
172
173 /**
174 * Constructor using default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
175 * @param writeLimit
176 * 0 or a limit in bytes/s
177 * @param readLimit
178 * 0 or a limit in bytes/s
179 * @param checkInterval
180 * The delay between two computations of performances for
181 * channels or 0 if no stats are to be computed.
182 */
183 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) {
184 this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
185 }
186
187 /**
188 * Constructor using default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
189 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
190 *
191 * @param writeLimit
192 * 0 or a limit in bytes/s
193 * @param readLimit
194 * 0 or a limit in bytes/s
195 */
196 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) {
197 this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
198 }
199
200 /**
201 * Constructor using NO LIMIT, default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
202 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
203 */
204 protected AbstractTrafficShapingHandler() {
205 this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
206 }
207
208 /**
209 * Constructor using NO LIMIT and
210 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
211 *
212 * @param checkInterval
213 * The delay between two computations of performances for
214 * channels or 0 if no stats are to be computed.
215 */
216 protected AbstractTrafficShapingHandler(long checkInterval) {
217 this(0, 0, checkInterval, DEFAULT_MAX_TIME);
218 }
219
220 /**
221 * Change the underlying limitations and check interval.
222 * <p>Note the change will be taken as best effort, meaning
223 * that all already scheduled traffics will not be
224 * changed, but only applied to new traffics.</p>
225 * <p>So the expected usage of this method is to be used not too often,
226 * accordingly to the traffic shaping configuration.</p>
227 *
228 * @param newWriteLimit
229 * The new write limit (in bytes)
230 * @param newReadLimit
231 * The new read limit (in bytes)
232 * @param newCheckInterval
233 * The new check interval (in milliseconds)
234 */
235 public void configure(long newWriteLimit, long newReadLimit, long newCheckInterval) {
236 configure(newWriteLimit, newReadLimit);
237 configure(newCheckInterval);
238 }
239
240 /**
241 * Change the underlying limitations.
242 * <p>Note the change will be taken as best effort, meaning
243 * that all already scheduled traffics will not be
244 * changed, but only applied to new traffics.</p>
245 * <p>So the expected usage of this method is to be used not too often,
246 * accordingly to the traffic shaping configuration.</p>
247 *
248 * @param newWriteLimit
249 * The new write limit (in bytes)
250 * @param newReadLimit
251 * The new read limit (in bytes)
252 */
253 public void configure(long newWriteLimit, long newReadLimit) {
254 writeLimit = newWriteLimit;
255 readLimit = newReadLimit;
256 if (trafficCounter != null) {
257 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
258 }
259 }
260
261 /**
262 * Change the check interval.
263 *
264 * @param newCheckInterval
265 * The new check interval (in milliseconds)
266 */
267 public void configure(long newCheckInterval) {
268 checkInterval = newCheckInterval;
269 if (trafficCounter != null) {
270 trafficCounter.configure(checkInterval);
271 }
272 }
273
274 /**
275 * @return the writeLimit
276 */
277 public long getWriteLimit() {
278 return writeLimit;
279 }
280
281 /**
282 * <p>Note the change will be taken as best effort, meaning
283 * that all already scheduled traffics will not be
284 * changed, but only applied to new traffics.</p>
285 * <p>So the expected usage of this method is to be used not too often,
286 * accordingly to the traffic shaping configuration.</p>
287 *
288 * @param writeLimit the writeLimit to set
289 */
290 public void setWriteLimit(long writeLimit) {
291 this.writeLimit = writeLimit;
292 if (trafficCounter != null) {
293 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
294 }
295 }
296
297 /**
298 * @return the readLimit
299 */
300 public long getReadLimit() {
301 return readLimit;
302 }
303
304 /**
305 * <p>Note the change will be taken as best effort, meaning
306 * that all already scheduled traffics will not be
307 * changed, but only applied to new traffics.</p>
308 * <p>So the expected usage of this method is to be used not too often,
309 * accordingly to the traffic shaping configuration.</p>
310 *
311 * @param readLimit the readLimit to set
312 */
313 public void setReadLimit(long readLimit) {
314 this.readLimit = readLimit;
315 if (trafficCounter != null) {
316 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
317 }
318 }
319
320 /**
321 * @return the checkInterval
322 */
323 public long getCheckInterval() {
324 return checkInterval;
325 }
326
327 /**
328 * @param checkInterval the interval in ms between each step check to set, default value being 1000 ms.
329 */
330 public void setCheckInterval(long checkInterval) {
331 this.checkInterval = checkInterval;
332 if (trafficCounter != null) {
333 trafficCounter.configure(checkInterval);
334 }
335 }
336
337 /**
338 * <p>Note the change will be taken as best effort, meaning
339 * that all already scheduled traffics will not be
340 * changed, but only applied to new traffics.</p>
341 * <p>So the expected usage of this method is to be used not too often,
342 * accordingly to the traffic shaping configuration.</p>
343 *
344 * @param maxTime
345 * Max delay in wait, shall be less than TIME OUT in related protocol.
346 * Must be positive.
347 */
348 public void setMaxTimeWait(long maxTime) {
349 if (maxTime <= 0) {
350 throw new IllegalArgumentException("maxTime must be positive");
351 }
352 this.maxTime = maxTime;
353 }
354
355 /**
356 * @return the max delay in wait to prevent TIME OUT
357 */
358 public long getMaxTimeWait() {
359 return maxTime;
360 }
361
362 /**
363 * @return the maxWriteDelay
364 */
365 public long getMaxWriteDelay() {
366 return maxWriteDelay;
367 }
368
369 /**
370 * <p>Note the change will be taken as best effort, meaning
371 * that all already scheduled traffics will not be
372 * changed, but only applied to new traffics.</p>
373 * <p>So the expected usage of this method is to be used not too often,
374 * accordingly to the traffic shaping configuration.</p>
375 *
376 * @param maxWriteDelay the maximum Write Delay in ms in the buffer allowed before write suspension is set.
377 * Must be positive.
378 */
379 public void setMaxWriteDelay(long maxWriteDelay) {
380 if (maxWriteDelay <= 0) {
381 throw new IllegalArgumentException("maxWriteDelay must be positive");
382 }
383 this.maxWriteDelay = maxWriteDelay;
384 }
385
386 /**
387 * @return the maxWriteSize default being {@value #DEFAULT_MAX_SIZE} bytes.
388 */
389 public long getMaxWriteSize() {
390 return maxWriteSize;
391 }
392
393 /**
394 * <p>Note that this limit is a best effort on memory limitation to prevent Out Of
395 * Memory Exception. To ensure it works, the handler generating the write should
396 * use one of the way provided by Netty to handle the capacity:</p>
397 * <p>- the {@code Channel.isWritable()} property and the corresponding
398 * {@code channelWritabilityChanged()}</p>
399 * <p>- the {@code ChannelFuture.addListener(new GenericFutureListener())}</p>
400 *
401 * @param maxWriteSize the maximum Write Size allowed in the buffer
402 * per channel before write suspended is set,
403 * default being {@value #DEFAULT_MAX_SIZE} bytes.
404 */
405 public void setMaxWriteSize(long maxWriteSize) {
406 this.maxWriteSize = maxWriteSize;
407 }
408
409 /**
410 * Called each time the accounting is computed from the TrafficCounters.
411 * This method could be used for instance to implement almost real time accounting.
412 *
413 * @param counter
414 * the TrafficCounter that computes its performance
415 */
416 protected void doAccounting(TrafficCounter counter) {
417 // NOOP by default
418 }
419
420 /**
421 * Class to implement setReadable at fix time
422 */
423 static final class ReopenReadTimerTask implements Runnable {
424 final ChannelHandlerContext ctx;
425
426 ReopenReadTimerTask(ChannelHandlerContext ctx) {
427 this.ctx = ctx;
428 }
429
430 @Override
431 public void run() {
432 ChannelConfig config = ctx.channel().config();
433 if (!config.isAutoRead() && isHandlerActive(ctx)) {
434 // If AutoRead is False and Active is True, user make a direct setAutoRead(false)
435 // Then Just reset the status
436 if (logger.isDebugEnabled()) {
437 logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
438 isHandlerActive(ctx));
439 }
440 ctx.attr(READ_SUSPENDED).set(false);
441 } else {
442 // Anything else allows the handler to reset the AutoRead
443 if (logger.isDebugEnabled()) {
444 if (config.isAutoRead() && !isHandlerActive(ctx)) {
445 logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
446 isHandlerActive(ctx));
447 } else {
448 logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
449 + isHandlerActive(ctx));
450 }
451 }
452 ctx.attr(READ_SUSPENDED).set(false);
453 config.setAutoRead(true);
454 ctx.channel().read();
455 }
456 if (logger.isDebugEnabled()) {
457 logger.debug("Unsuspend final status => " + config.isAutoRead() + ':'
458 + isHandlerActive(ctx));
459 }
460 }
461 }
462
463 /**
464 * Release the Read suspension
465 */
466 void releaseReadSuspended(ChannelHandlerContext ctx) {
467 ctx.attr(READ_SUSPENDED).set(false);
468 ctx.channel().config().setAutoRead(true);
469 }
470
471 @Override
472 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
473 long size = calculateSize(msg);
474 long now = TrafficCounter.milliSecondFromNano();
475 if (size > 0) {
476 // compute the number of ms to wait before reopening the channel
477 long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
478 wait = checkWaitReadTime(ctx, wait, now);
479 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
480 // time in order to try to limit the traffic
481 // Only AutoRead AND HandlerActive True means Context Active
482 ChannelConfig config = ctx.channel().config();
483 if (logger.isDebugEnabled()) {
484 logger.debug("Read suspend: " + wait + ':' + config.isAutoRead() + ':'
485 + isHandlerActive(ctx));
486 }
487 if (config.isAutoRead() && isHandlerActive(ctx)) {
488 config.setAutoRead(false);
489 ctx.attr(READ_SUSPENDED).set(true);
490 // Create a Runnable to reactive the read if needed. If one was create before it will just be
491 // reused to limit object creation
492 Attribute<Runnable> attr = ctx.attr(REOPEN_TASK);
493 Runnable reopenTask = attr.get();
494 if (reopenTask == null) {
495 reopenTask = new ReopenReadTimerTask(ctx);
496 attr.set(reopenTask);
497 }
498 ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
499 if (logger.isDebugEnabled()) {
500 logger.debug("Suspend final status => " + config.isAutoRead() + ':'
501 + isHandlerActive(ctx) + " will reopened at: " + wait);
502 }
503 }
504 }
505 }
506 informReadOperation(ctx, now);
507 ctx.fireChannelRead(msg);
508 }
509
510 /**
511 * Method overridden in GTSH to take into account specific timer for the channel.
512 * @param wait the wait delay computed in ms
513 * @param now the relative now time in ms
514 * @return the wait to use according to the context
515 */
516 long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
517 // no change by default
518 return wait;
519 }
520
521 /**
522 * Method overridden in GTSH to take into account specific timer for the channel.
523 * @param now the relative now time in ms
524 */
525 void informReadOperation(final ChannelHandlerContext ctx, final long now) {
526 // default noop
527 }
528
529 protected static boolean isHandlerActive(ChannelHandlerContext ctx) {
530 Boolean suspended = ctx.attr(READ_SUSPENDED).get();
531 return suspended == null || Boolean.FALSE.equals(suspended);
532 }
533
534 @Override
535 public void read(ChannelHandlerContext ctx) {
536 if (isHandlerActive(ctx)) {
537 // For Global Traffic (and Read when using EventLoop in pipeline) : check if READ_SUSPENDED is False
538 ctx.read();
539 }
540 }
541
542 @Override
543 public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
544 throws Exception {
545 long size = calculateSize(msg);
546 long now = TrafficCounter.milliSecondFromNano();
547 if (size > 0) {
548 // compute the number of ms to wait before continue with the channel
549 long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
550 if (wait >= MINIMAL_WAIT) {
551 if (logger.isDebugEnabled()) {
552 logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
553 + isHandlerActive(ctx));
554 }
555 submitWrite(ctx, msg, size, wait, now, promise);
556 return;
557 }
558 }
559 // to maintain order of write
560 submitWrite(ctx, msg, size, 0, now, promise);
561 }
562
563 @Deprecated
564 protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
565 final long delay, final ChannelPromise promise) {
566 submitWrite(ctx, msg, calculateSize(msg),
567 delay, TrafficCounter.milliSecondFromNano(), promise);
568 }
569
570 abstract void submitWrite(
571 ChannelHandlerContext ctx, Object msg, long size, long delay, long now, ChannelPromise promise);
572
573 @Override
574 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
575 setUserDefinedWritability(ctx, true);
576 super.channelRegistered(ctx);
577 }
578
579 void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
580 ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
581 if (cob != null) {
582 cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable);
583 }
584 }
585
586 /**
587 * Check the writability according to delay and size for the channel.
588 * Set if necessary setUserDefinedWritability status.
589 * @param delay the computed delay
590 * @param queueSize the current queueSize
591 */
592 void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
593 if (queueSize > maxWriteSize || delay > maxWriteDelay) {
594 setUserDefinedWritability(ctx, false);
595 }
596 }
597 /**
598 * Explicitly release the Write suspended status.
599 */
600 void releaseWriteSuspended(ChannelHandlerContext ctx) {
601 setUserDefinedWritability(ctx, true);
602 }
603
604 /**
605 * @return the current TrafficCounter (if
606 * channel is still connected)
607 */
608 public TrafficCounter trafficCounter() {
609 return trafficCounter;
610 }
611
612 @Override
613 public String toString() {
614 StringBuilder builder = new StringBuilder(290)
615 .append("TrafficShaping with Write Limit: ").append(writeLimit)
616 .append(" Read Limit: ").append(readLimit)
617 .append(" CheckInterval: ").append(checkInterval)
618 .append(" maxDelay: ").append(maxWriteDelay)
619 .append(" maxSize: ").append(maxWriteSize)
620 .append(" and Counter: ");
621 if (trafficCounter != null) {
622 builder.append(trafficCounter);
623 } else {
624 builder.append("none");
625 }
626 return builder.toString();
627 }
628
629 /**
630 * Calculate the size of the given {@link Object}.
631 *
632 * This implementation supports {@link ByteBuf} and {@link ByteBufHolder}. Sub-classes may override this.
633 *
634 * @param msg
635 * the msg for which the size should be calculated.
636 * @return size the size of the msg or {@code -1} if unknown.
637 */
638 protected long calculateSize(Object msg) {
639 if (msg instanceof ByteBuf) {
640 return ((ByteBuf) msg).readableBytes();
641 }
642 if (msg instanceof ByteBufHolder) {
643 return ((ByteBufHolder) msg).content().readableBytes();
644 }
645 return -1;
646 }
647 }