1 /*
2 * Copyright 2011 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty5.handler.traffic;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.channel.Channel;
20 import io.netty5.channel.ChannelHandler;
21 import io.netty5.channel.ChannelHandlerContext;
22 import io.netty5.channel.ChannelOption;
23 import io.netty5.channel.FileRegion;
24 import io.netty5.util.Attribute;
25 import io.netty5.util.AttributeKey;
26 import io.netty5.util.concurrent.Future;
27 import io.netty5.util.concurrent.Promise;
28 import io.netty5.util.internal.logging.InternalLogger;
29 import io.netty5.util.internal.logging.InternalLoggerFactory;
30
31 import java.util.concurrent.TimeUnit;
32
33 import static io.netty5.util.internal.ObjectUtil.checkPositive;
34
35 /**
36 * <p>AbstractTrafficShapingHandler allows to limit the global bandwidth
37 * (see {@link GlobalTrafficShapingHandler}) or per session
38 * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
39 * It allows you to implement an almost real time monitoring of the bandwidth using
40 * the monitors from {@link TrafficCounter} that will call back every checkInterval
41 * the method doAccounting of this handler.</p>
42 *
43 * <p>If you want for any particular reasons to stop the monitoring (accounting) or to change
44 * the read/write limit or the check interval, several methods allow that for you:</p>
45 * <ul>
46 * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
47 * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
48 * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
49 * </ul>
50 */
51 public abstract class AbstractTrafficShapingHandler implements ChannelHandler {
52 private static final InternalLogger logger =
53 InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class);
54 /**
55 * Default delay between two checks: 1s
56 */
57 public static final long DEFAULT_CHECK_INTERVAL = 1000;
58
59 /**
60 * Default max delay in case of traffic shaping
61 * (during which no communication will occur).
62 * Shall be less than TIMEOUT. Here half of "standard" 30s
63 */
64 public static final long DEFAULT_MAX_TIME = 15000;
65
66 /**
67 * Default max size to not exceed in buffer (write only).
68 */
69 static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
70
71 /**
72 * Default minimal time to wait: 10ms
73 */
74 static final long MINIMAL_WAIT = 10;
75
76 /**
77 * Traffic Counter
78 */
79 protected TrafficCounter trafficCounter;
80
81 /**
82 * Limit in B/s to apply to write
83 */
84 private volatile long writeLimit;
85
86 /**
87 * Limit in B/s to apply to read
88 */
89 private volatile long readLimit;
90
91 /**
92 * Max delay in wait
93 */
94 protected volatile long maxTime = DEFAULT_MAX_TIME; // default 15 s
95
96 /**
97 * Delay between two performance snapshots
98 */
99 protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
100
101 static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey
102 .valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED");
103 static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class
104 .getName() + ".REOPEN_TASK");
105
106 /**
107 * Max time to delay before proposing to stop writing new objects from next handlers
108 */
109 volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL; // default 4 s
110 /**
111 * Max size in the list before proposing to stop writing new objects from next handlers
112 */
113 volatile long maxWriteSize = DEFAULT_MAX_SIZE; // default 4MB
114
115 /**
116 * Rank in UserDefinedWritability (1 for Channel, 2 for Global TrafficShapingHandler).
117 * Set in final constructor. Must be between 1 and 31
118 */
119 final int userDefinedWritabilityIndex;
120
121 /**
122 * Default value for Channel UserDefinedWritability index
123 */
124 static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
125
126 /**
127 * Default value for Global UserDefinedWritability index
128 */
129 static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
130
131 /**
132 * Default value for GlobalChannel UserDefinedWritability index
133 */
134 static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
135
136 /**
137 * @param newTrafficCounter
138 * the TrafficCounter to set
139 */
140 void setTrafficCounter(TrafficCounter newTrafficCounter) {
141 trafficCounter = newTrafficCounter;
142 }
143
144 /**
145 * @return the index to be used by the TrafficShapingHandler to manage the user defined writability.
146 * For Channel TSH it is defined as {@value #CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
147 * for Global TSH it is defined as {@value #GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
148 * for GlobalChannel TSH it is defined as
149 * {@value #GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}.
150 */
151 protected int userDefinedWritabilityIndex() {
152 return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
153 }
154
155 /**
156 * @param writeLimit
157 * 0 or a limit in bytes/s
158 * @param readLimit
159 * 0 or a limit in bytes/s
160 * @param checkInterval
161 * The delay between two computations of performances for
162 * channels or 0 if no stats are to be computed.
163 * @param maxTime
164 * The maximum delay to wait in case of traffic excess.
165 * Must be positive.
166 */
167 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
168 this.maxTime = checkPositive(maxTime, "maxTime");
169
170 userDefinedWritabilityIndex = userDefinedWritabilityIndex();
171 this.writeLimit = writeLimit;
172 this.readLimit = readLimit;
173 this.checkInterval = checkInterval;
174 }
175
176 /**
177 * Constructor using default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
178 * @param writeLimit
179 * 0 or a limit in bytes/s
180 * @param readLimit
181 * 0 or a limit in bytes/s
182 * @param checkInterval
183 * The delay between two computations of performances for
184 * channels or 0 if no stats are to be computed.
185 */
186 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) {
187 this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
188 }
189
190 /**
191 * Constructor using default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
192 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
193 *
194 * @param writeLimit
195 * 0 or a limit in bytes/s
196 * @param readLimit
197 * 0 or a limit in bytes/s
198 */
199 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) {
200 this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
201 }
202
203 /**
204 * Constructor using NO LIMIT, default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
205 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
206 */
207 protected AbstractTrafficShapingHandler() {
208 this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
209 }
210
211 /**
212 * Constructor using NO LIMIT and
213 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
214 *
215 * @param checkInterval
216 * The delay between two computations of performances for
217 * channels or 0 if no stats are to be computed.
218 */
219 protected AbstractTrafficShapingHandler(long checkInterval) {
220 this(0, 0, checkInterval, DEFAULT_MAX_TIME);
221 }
222
223 /**
224 * Change the underlying limitations and check interval.
225 * <p>Note the change will be taken as best effort, meaning
226 * that all already scheduled traffics will not be
227 * changed, but only applied to new traffics.</p>
228 * <p>So the expected usage of this method is to be used not too often,
229 * accordingly to the traffic shaping configuration.</p>
230 *
231 * @param newWriteLimit The new write limit (in bytes)
232 * @param newReadLimit The new read limit (in bytes)
233 * @param newCheckInterval The new check interval (in milliseconds)
234 */
235 public void configure(long newWriteLimit, long newReadLimit,
236 long newCheckInterval) {
237 configure(newWriteLimit, newReadLimit);
238 configure(newCheckInterval);
239 }
240
241 /**
242 * Change the underlying limitations.
243 * <p>Note the change will be taken as best effort, meaning
244 * that all already scheduled traffics will not be
245 * changed, but only applied to new traffics.</p>
246 * <p>So the expected usage of this method is to be used not too often,
247 * accordingly to the traffic shaping configuration.</p>
248 *
249 * @param newWriteLimit The new write limit (in bytes)
250 * @param newReadLimit The new read limit (in bytes)
251 */
252 public void configure(long newWriteLimit, long newReadLimit) {
253 writeLimit = newWriteLimit;
254 readLimit = newReadLimit;
255 if (trafficCounter != null) {
256 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
257 }
258 }
259
260 /**
261 * Change the check interval.
262 *
263 * @param newCheckInterval The new check interval (in milliseconds)
264 */
265 public void configure(long newCheckInterval) {
266 checkInterval = newCheckInterval;
267 if (trafficCounter != null) {
268 trafficCounter.configure(checkInterval);
269 }
270 }
271
272 /**
273 * @return the writeLimit
274 */
275 public long getWriteLimit() {
276 return writeLimit;
277 }
278
279 /**
280 * <p>Note the change will be taken as best effort, meaning
281 * that all already scheduled traffics will not be
282 * changed, but only applied to new traffics.</p>
283 * <p>So the expected usage of this method is to be used not too often,
284 * accordingly to the traffic shaping configuration.</p>
285 *
286 * @param writeLimit the writeLimit to set
287 */
288 public void setWriteLimit(long writeLimit) {
289 this.writeLimit = writeLimit;
290 if (trafficCounter != null) {
291 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
292 }
293 }
294
295 /**
296 * @return the readLimit
297 */
298 public long getReadLimit() {
299 return readLimit;
300 }
301
302 /**
303 * <p>Note the change will be taken as best effort, meaning
304 * that all already scheduled traffics will not be
305 * changed, but only applied to new traffics.</p>
306 * <p>So the expected usage of this method is to be used not too often,
307 * accordingly to the traffic shaping configuration.</p>
308 *
309 * @param readLimit the readLimit to set
310 */
311 public void setReadLimit(long readLimit) {
312 this.readLimit = readLimit;
313 if (trafficCounter != null) {
314 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
315 }
316 }
317
318 /**
319 * @return the checkInterval
320 */
321 public long getCheckInterval() {
322 return checkInterval;
323 }
324
325 /**
326 * @param checkInterval the interval in ms between each step check to set, default value being 1000 ms.
327 */
328 public void setCheckInterval(long checkInterval) {
329 this.checkInterval = checkInterval;
330 if (trafficCounter != null) {
331 trafficCounter.configure(checkInterval);
332 }
333 }
334
335 /**
336 * <p>Note the change will be taken as best effort, meaning
337 * that all already scheduled traffics will not be
338 * changed, but only applied to new traffics.</p>
339 * <p>So the expected usage of this method is to be used not too often,
340 * accordingly to the traffic shaping configuration.</p>
341 *
342 * @param maxTime
343 * Max delay in wait, shall be less than TIME OUT in related protocol.
344 * Must be positive.
345 */
346 public void setMaxTimeWait(long maxTime) {
347 this.maxTime = checkPositive(maxTime, "maxTime");
348 }
349
350 /**
351 * @return the max delay in wait to prevent TIME OUT
352 */
353 public long getMaxTimeWait() {
354 return maxTime;
355 }
356
357 /**
358 * @return the maxWriteDelay
359 */
360 public long getMaxWriteDelay() {
361 return maxWriteDelay;
362 }
363
364 /**
365 * <p>Note the change will be taken as best effort, meaning
366 * that all already scheduled traffics will not be
367 * changed, but only applied to new traffics.</p>
368 * <p>So the expected usage of this method is to be used not too often,
369 * accordingly to the traffic shaping configuration.</p>
370 *
371 * @param maxWriteDelay the maximum Write Delay in ms in the buffer allowed before write suspension is set.
372 * Must be positive.
373 */
374 public void setMaxWriteDelay(long maxWriteDelay) {
375 this.maxWriteDelay = checkPositive(maxWriteDelay, "maxWriteDelay");
376 }
377
378 /**
379 * @return the maxWriteSize default being {@value #DEFAULT_MAX_SIZE} bytes.
380 */
381 public long getMaxWriteSize() {
382 return maxWriteSize;
383 }
384
385 /**
386 * <p>Note that this limit is a best effort on memory limitation to prevent Out Of
387 * Memory Exception. To ensure it works, the handler generating the write should
388 * use one of the way provided by Netty to handle the capacity:</p>
389 * <p>- the {@code Channel.isWritable()} property and the corresponding
390 * {@code channelWritabilityChanged()}</p>
391 * <p>- the {@code Future.addListener(future -> ...)}</p>
392 *
393 * @param maxWriteSize the maximum Write Size allowed in the buffer
394 * per channel before write suspended is set,
395 * default being {@value #DEFAULT_MAX_SIZE} bytes.
396 */
397 public void setMaxWriteSize(long maxWriteSize) {
398 this.maxWriteSize = maxWriteSize;
399 }
400
401 /**
402 * Called each time the accounting is computed from the TrafficCounters.
403 * This method could be used for instance to implement almost real time accounting.
404 *
405 * @param counter
406 * the TrafficCounter that computes its performance
407 */
408 protected void doAccounting(TrafficCounter counter) {
409 // NOOP by default
410 }
411
412 /**
413 * Class to implement setReadable at fix time
414 */
415 static final class ReopenReadTimerTask implements Runnable {
416 final ChannelHandlerContext ctx;
417 ReopenReadTimerTask(ChannelHandlerContext ctx) {
418 this.ctx = ctx;
419 }
420
421 @Override
422 public void run() {
423 Channel channel = ctx.channel();
424 if (!channel.getOption(ChannelOption.AUTO_READ) && isHandlerActive(ctx)) {
425 // If AutoRead is False and Active is True, user make a direct setAutoRead(false)
426 // Then Just reset the status
427 if (logger.isDebugEnabled()) {
428 logger.debug("Not unsuspend: " + channel.getOption(ChannelOption.AUTO_READ) + ':' +
429 isHandlerActive(ctx));
430 }
431 channel.attr(READ_SUSPENDED).set(false);
432 } else {
433 // Anything else allows the handler to reset the AutoRead
434 if (logger.isDebugEnabled()) {
435 if (channel.getOption(ChannelOption.AUTO_READ) && !isHandlerActive(ctx)) {
436 if (logger.isDebugEnabled()) {
437 logger.debug("Unsuspend: " + channel.getOption(ChannelOption.AUTO_READ) + ':' +
438 isHandlerActive(ctx));
439 }
440 } else {
441 if (logger.isDebugEnabled()) {
442 logger.debug("Normal unsuspend: " + channel.getOption(ChannelOption.AUTO_READ) + ':'
443 + isHandlerActive(ctx));
444 }
445 }
446 }
447 channel.attr(READ_SUSPENDED).set(false);
448 channel.setOption(ChannelOption.AUTO_READ, true);
449 channel.read();
450 }
451 if (logger.isDebugEnabled()) {
452 logger.debug("Unsuspend final status => " + channel.getOption(ChannelOption.AUTO_READ) + ':'
453 + isHandlerActive(ctx));
454 }
455 }
456 }
457
458 /**
459 * Release the Read suspension
460 */
461 void releaseReadSuspended(ChannelHandlerContext ctx) {
462 Channel channel = ctx.channel();
463 channel.attr(READ_SUSPENDED).set(false);
464 channel.setOption(ChannelOption.AUTO_READ, true);
465 }
466
467 @Override
468 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
469 long size = calculateSize(msg);
470 long now = TrafficCounter.milliSecondFromNano();
471 if (size > 0) {
472 // compute the number of ms to wait before reopening the channel
473 long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
474 wait = checkWaitReadTime(ctx, wait, now);
475 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
476 // time in order to try to limit the traffic
477 // Only AutoRead AND HandlerActive True means Context Active
478 Channel channel = ctx.channel();
479 if (logger.isDebugEnabled()) {
480 logger.debug("Read suspend: " + wait + ':' + channel.getOption(ChannelOption.AUTO_READ) + ':'
481 + isHandlerActive(ctx));
482 }
483 if (channel.getOption(ChannelOption.AUTO_READ) && isHandlerActive(ctx)) {
484 channel.setOption(ChannelOption.AUTO_READ, false);
485 channel.attr(READ_SUSPENDED).set(true);
486 // Create a Runnable to reactive the read if needed. If one was create before it will just be
487 // reused to limit object creation
488 Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
489 Runnable reopenTask = attr.get();
490 if (reopenTask == null) {
491 reopenTask = new ReopenReadTimerTask(ctx);
492 attr.set(reopenTask);
493 }
494 ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
495 if (logger.isDebugEnabled()) {
496 logger.debug("Suspend final status => " + channel.getOption(ChannelOption.AUTO_READ) + ':'
497 + isHandlerActive(ctx) + " will reopened at: " + wait);
498 }
499 }
500 }
501 }
502 informReadOperation(ctx, now);
503 ctx.fireChannelRead(msg);
504 }
505
506 @Override
507 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
508 Channel channel = ctx.channel();
509 if (channel.hasAttr(REOPEN_TASK)) {
510 //release the reopen task
511 channel.attr(REOPEN_TASK).set(null);
512 }
513 }
514
515 /**
516 * Method overridden in GTSH to take into account specific timer for the channel.
517 * @param wait the wait delay computed in ms
518 * @param now the relative now time in ms
519 * @return the wait to use according to the context
520 */
521 long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
522 // no change by default
523 return wait;
524 }
525
526 /**
527 * Method overridden in GTSH to take into account specific timer for the channel.
528 * @param now the relative now time in ms
529 */
530 void informReadOperation(final ChannelHandlerContext ctx, final long now) {
531 // default noop
532 }
533
534 protected static boolean isHandlerActive(ChannelHandlerContext ctx) {
535 Boolean suspended = ctx.channel().attr(READ_SUSPENDED).get();
536 return suspended == null || Boolean.FALSE.equals(suspended);
537 }
538
539 @Override
540 public void read(ChannelHandlerContext ctx) {
541 if (isHandlerActive(ctx)) {
542 // For Global Traffic (and Read when using EventLoop in pipeline) : check if READ_SUSPENDED is False
543 ctx.read();
544 }
545 }
546
547 @Override
548 public Future<Void> write(final ChannelHandlerContext ctx, final Object msg) {
549 long size = calculateSize(msg);
550 long now = TrafficCounter.milliSecondFromNano();
551 Promise<Void> promise = ctx.newPromise();
552 if (size > 0) {
553 // compute the number of ms to wait before continue with the channel
554 long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
555 if (wait >= MINIMAL_WAIT) {
556 if (logger.isDebugEnabled()) {
557 logger.debug("Write suspend: " + wait + ':'
558 + ctx.channel().getOption(ChannelOption.AUTO_READ) + ':'
559 + isHandlerActive(ctx));
560 }
561 submitWrite(ctx, msg, size, wait, now, promise);
562 return promise.asFuture();
563 }
564 }
565 // to maintain order of write
566 submitWrite(ctx, msg, size, 0, now, promise);
567 return promise.asFuture();
568 }
569
570 @Deprecated
571 protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
572 final long delay, final Promise<Void> promise) {
573 submitWrite(ctx, msg, calculateSize(msg),
574 delay, TrafficCounter.milliSecondFromNano(), promise);
575 }
576
577 abstract void submitWrite(
578 ChannelHandlerContext ctx, Object msg, long size, long delay, long now, Promise<Void> promise);
579
580 @Override
581 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
582 setUserDefinedWritability(ctx, true);
583 ctx.fireChannelRegistered();
584 }
585
586 // TODO: Fix me later!
587 void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
588 /**
589 ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
590 if (cob != null) {
591 cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable);
592 }
593 **/
594 }
595
596 /**
597 * Check the writability according to delay and size for the channel.
598 * Set if necessary setUserDefinedWritability status.
599 * @param delay the computed delay
600 * @param queueSize the current queueSize
601 */
602 void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
603 if (queueSize > maxWriteSize || delay > maxWriteDelay) {
604 setUserDefinedWritability(ctx, false);
605 }
606 }
607 /**
608 * Explicitly release the Write suspended status.
609 */
610 void releaseWriteSuspended(ChannelHandlerContext ctx) {
611 setUserDefinedWritability(ctx, true);
612 }
613
614 /**
615 * @return the current TrafficCounter (if
616 * channel is still connected)
617 */
618 public TrafficCounter trafficCounter() {
619 return trafficCounter;
620 }
621
622 @Override
623 public String toString() {
624 StringBuilder builder = new StringBuilder(290)
625 .append("TrafficShaping with Write Limit: ").append(writeLimit)
626 .append(" Read Limit: ").append(readLimit)
627 .append(" CheckInterval: ").append(checkInterval)
628 .append(" maxDelay: ").append(maxWriteDelay)
629 .append(" maxSize: ").append(maxWriteSize)
630 .append(" and Counter: ");
631 if (trafficCounter != null) {
632 builder.append(trafficCounter);
633 } else {
634 builder.append("none");
635 }
636 return builder.toString();
637 }
638
639 /**
640 * Calculate the size of the given {@link Object}.
641 *
642 * This implementation supports {@link Buffer} and {@link FileRegion}.
643 * Sub-classes may override this.
644 * @param msg the msg for which the size should be calculated.
645 * @return size the size of the msg or {@code -1} if unknown.
646 */
647 protected long calculateSize(Object msg) {
648 // TODO we should have a Sized interface to generalise all of this.
649 if (msg instanceof Buffer) {
650 return ((Buffer) msg).readableBytes();
651 }
652 if (msg instanceof FileRegion) {
653 return ((FileRegion) msg).count();
654 }
655 return -1;
656 }
657 }