1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.traffic;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.channel.Channel;
20 import org.jboss.netty.channel.ChannelHandlerContext;
21 import org.jboss.netty.channel.ChannelStateEvent;
22 import org.jboss.netty.channel.MessageEvent;
23 import org.jboss.netty.channel.SimpleChannelHandler;
24 import org.jboss.netty.logging.InternalLogger;
25 import org.jboss.netty.logging.InternalLoggerFactory;
26 import org.jboss.netty.util.DefaultObjectSizeEstimator;
27 import org.jboss.netty.util.ExternalResourceReleasable;
28 import org.jboss.netty.util.ObjectSizeEstimator;
29 import org.jboss.netty.util.Timeout;
30 import org.jboss.netty.util.Timer;
31 import org.jboss.netty.util.TimerTask;
32
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicBoolean;
35
36 /**
37 * <p>AbstractTrafficShapingHandler allows to limit the global bandwidth
38 * (see {@link GlobalTrafficShapingHandler}) or per session
39 * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
40 * It allows too to implement an almost real time monitoring of the bandwidth using
41 * the monitors from {@link TrafficCounter} that will call back every checkInterval
42 * the method doAccounting of this handler.</p>
43 *
44 * <p>An {@link ObjectSizeEstimator} can be passed at construction to specify what
45 * is the size of the object to be read or write accordingly to the type of
46 * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.</p>
47 *
48 * <p>If you want for any particular reasons to stop the monitoring (accounting) or to change
49 * the read/write limit or the check interval, several methods allow that for you:</p>
50 * <ul>
51 * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
52 * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
53 * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
54 * </ul>
55 */
56 public abstract class AbstractTrafficShapingHandler extends
57 SimpleChannelHandler implements ExternalResourceReleasable {
58 /**
59 * Internal logger
60 */
61 static InternalLogger logger = InternalLoggerFactory
62 .getInstance(AbstractTrafficShapingHandler.class);
63
64 /**
65 * Default delay between two checks: 1s
66 */
67 public static final long DEFAULT_CHECK_INTERVAL = 1000;
68
69 /**
70 * Default max delay in case of traffic shaping
71 * (during which no communication will occur).
72 * Shall be less than TIMEOUT. Here half of "standard" 30s
73 */
74 public static final long DEFAULT_MAX_TIME = 15000;
75
76 /**
77 * Default max size to not exceed in buffer (write only).
78 */
79 static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
80
81 /**
82 * Default minimal time to wait
83 */
84 static final long MINIMAL_WAIT = 10;
85
86 static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
87 static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
88 static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
89
90 /**
91 * Traffic Counter
92 */
93 protected TrafficCounter trafficCounter;
94
95 /**
96 * ObjectSizeEstimator
97 */
98 private ObjectSizeEstimator objectSizeEstimator;
99
100 /**
101 * Timer associated to any TrafficCounter
102 */
103 protected Timer timer;
104
105 /**
106 * used in releaseExternalResources() to cancel the timer
107 */
108 volatile Timeout timeout;
109
110 /**
111 * Limit in B/s to apply to write
112 */
113 private volatile long writeLimit;
114
115 /**
116 * Limit in B/s to apply to read
117 */
118 private volatile long readLimit;
119
120 /**
121 * Delay between two performance snapshots
122 */
123 protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
124
125 /**
126 * Max delay in wait
127 */
128 protected volatile long maxTime = DEFAULT_MAX_TIME; // default 15 s
129
130 /**
131 * Max time to delay before proposing to stop writing new objects from next handlers
132 */
133 volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL; // default 4 s
134
135 /**
136 * Max size in the list before proposing to stop writing new objects from next handlers
137 */
138 volatile long maxWriteSize = DEFAULT_MAX_SIZE; // default 4MB
139
140 /**
141 * Boolean associated with the release of this TrafficShapingHandler.
142 * It will be true only once when the releaseExternalRessources is called
143 * to prevent waiting when shutdown.
144 */
145 final AtomicBoolean release = new AtomicBoolean(false);
146 final int index;
147
148 /**
149 * Attachment of ChannelHandlerContext
150 *
151 */
152 static final class ReadWriteStatus {
153 volatile boolean readSuspend;
154 volatile TimerTask reopenReadTimerTask;
155 }
156
157 /**
158 * For simple ChannelBuffer, returns the readableBytes, else
159 * use standard DefaultObjectSizeEstimator.
160 */
161 public static class SimpleObjectSizeEstimator extends DefaultObjectSizeEstimator {
162 @Override
163 public int estimateSize(Object o) {
164 int size;
165 if (o instanceof ChannelBuffer) {
166 size = ((ChannelBuffer) o).readableBytes();
167 } else {
168 size = super.estimateSize(o);
169 }
170 return size;
171 }
172 }
173
174 /**
175 * @return the index to be used by the TrafficShapingHandler to manage the user defined
176 * writability. For Channel TSH it is defined as
177 * {@value #CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}, for Global TSH it is
178 * defined as {@value #GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
179 * for GlobalChannel TSH it is defined as
180 * {@value #GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}.
181 */
182 int userDefinedWritabilityIndex() {
183 if (this instanceof GlobalChannelTrafficShapingHandler) {
184 return GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
185 } else if (this instanceof GlobalTrafficShapingHandler) {
186 return GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
187 } else {
188 return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
189 }
190 }
191
192 private void init(ObjectSizeEstimator newObjectSizeEstimator,
193 Timer newTimer, long newWriteLimit, long newReadLimit,
194 long newCheckInterval, long newMaxTime) {
195 if (newMaxTime <= 0) {
196 throw new IllegalArgumentException("maxTime must be positive");
197 }
198 objectSizeEstimator = newObjectSizeEstimator;
199 timer = newTimer;
200 writeLimit = newWriteLimit;
201 readLimit = newReadLimit;
202 checkInterval = newCheckInterval;
203 maxTime = newMaxTime;
204 //logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
205 }
206
207 /**
208 * @param newTrafficCounter the TrafficCounter to set
209 */
210 void setTrafficCounter(TrafficCounter newTrafficCounter) {
211 trafficCounter = newTrafficCounter;
212 }
213
214 /**
215 * Constructor using default {@link ObjectSizeEstimator} and
216 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
217 *
218 * @param timer
219 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
220 * @param writeLimit
221 * 0 or a limit in bytes/s
222 * @param readLimit
223 * 0 or a limit in bytes/s
224 * @param checkInterval
225 * The delay between two computations of performances for
226 * channels or 0 if no stats are to be computed.
227 */
228 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
229 long readLimit, long checkInterval) {
230 index = userDefinedWritabilityIndex();
231 init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
232 DEFAULT_MAX_TIME);
233 }
234
235 /**
236 * Constructor using the specified ObjectSizeEstimator and
237 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
238 *
239 * @param objectSizeEstimator
240 * the {@link ObjectSizeEstimator} that will be used to compute
241 * the size of the message.
242 * @param timer
243 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
244 * @param writeLimit
245 * 0 or a limit in bytes/s
246 * @param readLimit
247 * 0 or a limit in bytes/s
248 * @param checkInterval
249 * The delay between two computations of performances for
250 * channels or 0 if no stats are to be computed.
251 */
252 protected AbstractTrafficShapingHandler(
253 ObjectSizeEstimator objectSizeEstimator, Timer timer,
254 long writeLimit, long readLimit, long checkInterval) {
255 index = userDefinedWritabilityIndex();
256 init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
257 }
258
259 /**
260 * Constructor using default {@link ObjectSizeEstimator} and using
261 * default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
262 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
263 *
264 * @param timer
265 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
266 * @param writeLimit
267 * 0 or a limit in bytes/s
268 * @param readLimit
269 * 0 or a limit in bytes/s
270 */
271 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
272 long readLimit) {
273 index = userDefinedWritabilityIndex();
274 init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit,
275 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
276 }
277
278 /**
279 * Constructor using the specified ObjectSizeEstimator and using
280 * default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
281 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
282 *
283 * @param objectSizeEstimator
284 * the {@link ObjectSizeEstimator} that will be used to compute
285 * the size of the message.
286 * @param timer
287 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
288 * @param writeLimit
289 * 0 or a limit in bytes/s
290 * @param readLimit
291 * 0 or a limit in bytes/s
292 */
293 protected AbstractTrafficShapingHandler(
294 ObjectSizeEstimator objectSizeEstimator, Timer timer,
295 long writeLimit, long readLimit) {
296 index = userDefinedWritabilityIndex();
297 init(objectSizeEstimator, timer, writeLimit, readLimit,
298 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
299 }
300
301 /**
302 * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and
303 * default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
304 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
305 *
306 * @param timer
307 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
308 */
309 protected AbstractTrafficShapingHandler(Timer timer) {
310 index = userDefinedWritabilityIndex();
311 init(new SimpleObjectSizeEstimator(), timer, 0, 0,
312 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
313 }
314
315 /**
316 * Constructor using the specified ObjectSizeEstimator and using NO LIMIT and
317 * default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
318 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
319 *
320 * @param objectSizeEstimator
321 * the {@link ObjectSizeEstimator} that will be used to compute
322 * the size of the message.
323 * @param timer
324 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
325 */
326 protected AbstractTrafficShapingHandler(
327 ObjectSizeEstimator objectSizeEstimator, Timer timer) {
328 index = userDefinedWritabilityIndex();
329 init(objectSizeEstimator, timer, 0, 0,
330 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
331 }
332
333 /**
334 * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and
335 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
336 *
337 * @param timer
338 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
339 * @param checkInterval
340 * The delay between two computations of performances for
341 * channels or 0 if no stats are to be computed.
342 */
343 protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
344 index = userDefinedWritabilityIndex();
345 init(new SimpleObjectSizeEstimator(), timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
346 }
347
348 /**
349 * Constructor using the specified ObjectSizeEstimator and using NO LIMIT and
350 * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
351 *
352 * @param objectSizeEstimator
353 * the {@link ObjectSizeEstimator} that will be used to compute
354 * the size of the message.
355 * @param timer
356 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
357 * @param checkInterval
358 * The delay between two computations of performances for
359 * channels or 0 if no stats are to be computed.
360 */
361 protected AbstractTrafficShapingHandler(
362 ObjectSizeEstimator objectSizeEstimator, Timer timer,
363 long checkInterval) {
364 index = userDefinedWritabilityIndex();
365 init(objectSizeEstimator, timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
366 }
367
368 /**
369 * Constructor using default {@link ObjectSizeEstimator}.
370 *
371 * @param timer
372 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
373 * @param writeLimit
374 * 0 or a limit in bytes/s
375 * @param readLimit
376 * 0 or a limit in bytes/s
377 * @param checkInterval
378 * The delay between two computations of performances for
379 * channels or 0 if no stats are to be computed.
380 * @param maxTime
381 * The max time to wait in case of excess of traffic (to prevent Time Out event).
382 * Must be positive.
383 */
384 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
385 long readLimit, long checkInterval, long maxTime) {
386 index = userDefinedWritabilityIndex();
387 init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
388 maxTime);
389 }
390
391 /**
392 * Constructor using the specified ObjectSizeEstimator.
393 *
394 * @param objectSizeEstimator
395 * the {@link ObjectSizeEstimator} that will be used to compute
396 * the size of the message.
397 * @param timer
398 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
399 * @param writeLimit
400 * 0 or a limit in bytes/s
401 * @param readLimit
402 * 0 or a limit in bytes/s
403 * @param checkInterval
404 * The delay between two computations of performances for
405 * channels or 0 if no stats are to be computed.
406 * @param maxTime
407 * The max time to wait in case of excess of traffic (to prevent Time Out event).
408 * Must be positive.
409 */
410 protected AbstractTrafficShapingHandler(
411 ObjectSizeEstimator objectSizeEstimator, Timer timer,
412 long writeLimit, long readLimit, long checkInterval, long maxTime) {
413 index = userDefinedWritabilityIndex();
414 init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, maxTime);
415 }
416
417 /**
418 * Change the underlying limitations and check interval.
419 * <p>Note the change will be taken as best effort, meaning
420 * that all already scheduled traffics will not be
421 * changed, but only applied to new traffics.</p>
422 * So the expected usage of this method is to be used not too often,
423 * accordingly to the traffic shaping configuration.
424 *
425 * @param newWriteLimit
426 * The new write limit (in bytes)
427 * @param newReadLimit
428 * The new read limit (in bytes)
429 * @param newCheckInterval
430 * The new check interval (in milliseconds)
431 */
432 public void configure(long newWriteLimit, long newReadLimit,
433 long newCheckInterval) {
434 configure(newWriteLimit, newReadLimit);
435 configure(newCheckInterval);
436 }
437
438 /**
439 * Change the underlying limitations.
440 * <p>Note the change will be taken as best effort, meaning
441 * that all already scheduled traffics will not be
442 * changed, but only applied to new traffics.</p>
443 * So the expected usage of this method is to be used not too often,
444 * accordingly to the traffic shaping configuration.
445 *
446 * @param newWriteLimit
447 * The new write limit (in bytes)
448 * @param newReadLimit
449 * The new read limit (in bytes)
450 */
451 public void configure(long newWriteLimit, long newReadLimit) {
452 writeLimit = newWriteLimit;
453 readLimit = newReadLimit;
454 if (trafficCounter != null) {
455 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
456 }
457 }
458
459 /**
460 * Change the check interval.
461 */
462 public void configure(long newCheckInterval) {
463 setCheckInterval(newCheckInterval);
464 }
465
466 /**
467 * @return the writeLimit
468 */
469 public long getWriteLimit() {
470 return writeLimit;
471 }
472
473 /**
474 * <p>Note the change will be taken as best effort, meaning
475 * that all already scheduled traffics will not be
476 * changed, but only applied to new traffics.</p>
477 * So the expected usage of this method is to be used not too often,
478 * accordingly to the traffic shaping configuration.
479 *
480 * @param writeLimit the writeLimit to set
481 */
482 public void setWriteLimit(long writeLimit) {
483 this.writeLimit = writeLimit;
484 if (trafficCounter != null) {
485 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
486 }
487 }
488
489 /**
490 * @return the readLimit
491 */
492 public long getReadLimit() {
493 return readLimit;
494 }
495
496 /**
497 * <p>Note the change will be taken as best effort, meaning
498 * that all already scheduled traffics will not be
499 * changed, but only applied to new traffics.</p>
500 * So the expected usage of this method is to be used not too often,
501 * accordingly to the traffic shaping configuration.
502 *
503 * @param readLimit the readLimit to set
504 */
505 public void setReadLimit(long readLimit) {
506 this.readLimit = readLimit;
507 if (trafficCounter != null) {
508 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
509 }
510 }
511
512 /**
513 * @return the checkInterval
514 */
515 public long getCheckInterval() {
516 return checkInterval;
517 }
518
519 /**
520 * @param newCheckInterval the checkInterval to set
521 */
522 public void setCheckInterval(long newCheckInterval) {
523 checkInterval = newCheckInterval;
524 if (trafficCounter != null) {
525 trafficCounter.configure(checkInterval);
526 }
527 }
528
529 /**
530 * @return the max delay on wait
531 */
532 public long getMaxTimeWait() {
533 return maxTime;
534 }
535
536 /**
537 * <p>Note the change will be taken as best effort, meaning
538 * that all already scheduled traffics will not be
539 * changed, but only applied to new traffics.</p>
540 * So the expected usage of this method is to be used not too often,
541 * accordingly to the traffic shaping configuration.
542 *
543 * @param maxTime
544 * Max delay in wait, shall be less than TIME OUT in related protocol.
545 * Must be positive.
546 */
547 public void setMaxTimeWait(long maxTime) {
548 if (maxTime <= 0) {
549 throw new IllegalArgumentException("maxTime must be positive");
550 }
551 this.maxTime = maxTime;
552 }
553
554 /**
555 * @return the maxWriteDelay
556 */
557 public long getMaxWriteDelay() {
558 return maxWriteDelay;
559 }
560
561 /**
562 * <p>Note the change will be taken as best effort, meaning
563 * that all already scheduled traffics will not be
564 * changed, but only applied to new traffics.</p>
565 * So the expected usage of this method is to be used not too often,
566 * accordingly to the traffic shaping configuration.
567 *
568 * @param maxWriteDelay the maximum Write Delay in ms in the buffer allowed before write suspended is set.
569 * Must be positive.
570 */
571 public void setMaxWriteDelay(long maxWriteDelay) {
572 if (maxWriteDelay <= 0) {
573 throw new IllegalArgumentException("maxWriteDelay must be positive");
574 }
575 this.maxWriteDelay = maxWriteDelay;
576 }
577
578 /**
579 * @return the maxWriteSize default being {@value #DEFAULT_MAX_SIZE} bytes
580 */
581 public long getMaxWriteSize() {
582 return maxWriteSize;
583 }
584
585 /**
586 * <p>Note the change will be taken as best effort, meaning
587 * that all already scheduled traffics will not be
588 * changed, but only applied to new traffics.</p>
589 * So the expected usage of this method is to be used not too often,
590 * accordingly to the traffic shaping configuration.
591 *
592 * @param maxWriteSize the maximum Write Size allowed in the buffer
593 * per channel before write suspended is set,
594 * default being {@value #DEFAULT_MAX_SIZE} bytes
595 */
596 public void setMaxWriteSize(long maxWriteSize) {
597 this.maxWriteSize = maxWriteSize;
598 }
599
600 /**
601 * Called each time the accounting is computed from the TrafficCounters.
602 * This method could be used for instance to implement almost real time accounting.
603 *
604 * @param counter
605 * the TrafficCounter that computes its performance
606 */
607 protected void doAccounting(TrafficCounter counter) {
608 // NOOP by default
609 }
610
611 /**
612 * Class to implement setReadable at fix time.
613 */
614 class ReopenReadTimerTask implements TimerTask {
615 final ChannelHandlerContext ctx;
616 ReopenReadTimerTask(ChannelHandlerContext ctx) {
617 this.ctx = ctx;
618 }
619 public void run(Timeout timeoutArg) throws Exception {
620 //logger.warn("Start RRTT: "+release.get());
621 if (release.get()) {
622 return;
623 }
624 ReadWriteStatus rws = checkAttachment(ctx);
625 Channel channel = ctx.getChannel();
626 if (! channel.isConnected()) {
627 // ignore
628 return;
629 }
630 if (!channel.isReadable() && ! rws.readSuspend) {
631 // If isReadable is False and Active is True, user make a direct setReadable(false)
632 // Then Just reset the status
633 if (logger.isDebugEnabled()) {
634 logger.debug("Not unsuspend: " + channel.isReadable() + ':' +
635 rws.readSuspend);
636 }
637 rws.readSuspend = false;
638 } else {
639 // Anything else allows the handler to reset the AutoRead
640 if (logger.isDebugEnabled()) {
641 if (channel.isReadable() && rws.readSuspend) {
642 logger.debug("Unsuspend: " + channel.isReadable() + ':' +
643 rws.readSuspend);
644 } else {
645 logger.debug("Normal unsuspend: " + channel.isReadable() + ':' +
646 rws.readSuspend);
647 }
648 }
649 rws.readSuspend = false;
650 channel.setReadable(true);
651 }
652 if (logger.isDebugEnabled()) {
653 logger.debug("Unsupsend final status => " + channel.isReadable() + ':' +
654 rws.readSuspend);
655 }
656 }
657 }
658
659 /**
660 * Release the Read suspension.
661 */
662 void releaseReadSuspended(ChannelHandlerContext ctx) {
663 ReadWriteStatus rws = checkAttachment(ctx);
664 rws.readSuspend = false;
665 ctx.getChannel().setReadable(true);
666 }
667
668 @Override
669 public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
670 throws Exception {
671 long now = TrafficCounter.milliSecondFromNano();
672 try {
673 ReadWriteStatus rws = checkAttachment(ctx);
674 long size = calculateSize(evt.getMessage());
675 if (size > 0 && trafficCounter != null) {
676 // compute the number of ms to wait before reopening the channel
677 long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
678 wait = checkWaitReadTime(ctx, wait, now);
679 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
680 // time in order to try to limit the traffic
681 if (release.get()) {
682 return;
683 }
684 Channel channel = ctx.getChannel();
685 if (channel != null && channel.isConnected()) {
686 // Only AutoRead AND HandlerActive True means Context Active
687 if (logger.isDebugEnabled()) {
688 logger.debug("Read suspend: " + wait + ':' + channel.isReadable() + ':' +
689 rws.readSuspend);
690 }
691 if (timer == null) {
692 // Sleep since no executor
693 // logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
694 Thread.sleep(wait);
695 return;
696 }
697 if (channel.isReadable() && ! rws.readSuspend) {
698 rws.readSuspend = true;
699 channel.setReadable(false);
700 if (logger.isDebugEnabled()) {
701 logger.debug("Suspend final status => " + channel.isReadable() + ':' +
702 rws.readSuspend);
703 }
704 // Create a Runnable to reactive the read if needed. If one was create before
705 // it will just be reused to limit object creation
706 if (rws.reopenReadTimerTask == null) {
707 rws.reopenReadTimerTask = new ReopenReadTimerTask(ctx);
708 }
709 timeout = timer.newTimeout(rws.reopenReadTimerTask, wait,
710 TimeUnit.MILLISECONDS);
711 }
712 }
713 }
714 }
715 } finally {
716 informReadOperation(ctx, now);
717 // The message is then just passed to the next handler
718 ctx.sendUpstream(evt);
719 }
720 }
721
722 /**
723 * Method overridden in GTSH to take into account specific timer for the channel.
724 * @param wait the wait delay computed in ms
725 * @param now the relative now time in ms
726 * @return the wait to use according to the context.
727 */
728 long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
729 // no change by default
730 return wait;
731 }
732
733 /**
734 * Method overridden in GTSH to take into account specific timer for the channel.
735 * @param now the relative now time in ms
736 */
737 void informReadOperation(final ChannelHandlerContext ctx, final long now) {
738 // default noop
739 }
740
741 @Override
742 public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
743 throws Exception {
744 long wait = 0;
745 long size = calculateSize(evt.getMessage());
746 long now = TrafficCounter.milliSecondFromNano();
747 Channel channel = ctx.getChannel();
748 try {
749 if (size > 0 && trafficCounter != null) {
750 // compute the number of ms to wait before continue with the channel
751 wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
752 if (logger.isDebugEnabled()) {
753 logger.debug("Write suspend: " + wait + ':' + channel.isWritable() + ':' +
754 channel.getUserDefinedWritability(index));
755 }
756 if (wait < MINIMAL_WAIT || release.get()) {
757 wait = 0;
758 }
759 }
760 } finally {
761 // The message is scheduled
762 submitWrite(ctx, evt, size, wait, now);
763 }
764 }
765
766 @Deprecated
767 protected void internalSubmitWrite(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
768 ctx.sendDownstream(evt);
769 }
770
771 @Deprecated
772 protected void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt,
773 final long delay) throws Exception {
774 submitWrite(ctx, evt, calculateSize(evt.getMessage()), delay, TrafficCounter.milliSecondFromNano());
775 }
776
777 abstract void submitWrite(ChannelHandlerContext ctx, MessageEvent evt, long size,
778 long delay, long now) throws Exception;
779
780 void setWritable(ChannelHandlerContext ctx, boolean writable) {
781 Channel channel = ctx.getChannel();
782 if (channel.isConnected()) {
783 channel.setUserDefinedWritability(index, writable);
784 }
785 }
786
787 /**
788 * Check the writability according to delay and size for the channel.
789 * Set if necessary the write suspended status.
790 * @param delay the computed delai
791 * @param queueSize the current queueSize
792 */
793 void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
794 if (queueSize > maxWriteSize || delay > maxWriteDelay) {
795 setWritable(ctx, false);
796 }
797 }
798
799 /**
800 * Explicitly release the Write suspended status.
801 */
802 void releaseWriteSuspended(ChannelHandlerContext ctx) {
803 setWritable(ctx, true);
804 }
805
806 /**
807 * @return the current TrafficCounter (if
808 * channel is still connected).
809 */
810 public TrafficCounter getTrafficCounter() {
811 return trafficCounter;
812 }
813
814 public void releaseExternalResources() {
815 if (trafficCounter != null) {
816 trafficCounter.stop();
817 }
818 release.set(true);
819 if (timeout != null) {
820 timeout.cancel();
821 }
822 //shall be done outside (since it can be shared): timer.stop();
823 }
824
825 static ReadWriteStatus checkAttachment(ChannelHandlerContext ctx) {
826 ReadWriteStatus rws = (ReadWriteStatus) ctx.getAttachment();
827 if (rws == null) {
828 rws = new ReadWriteStatus();
829 ctx.setAttachment(rws);
830 }
831 return rws;
832 }
833
834 @Override
835 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
836 checkAttachment(ctx);
837 setWritable(ctx, true);
838 super.channelConnected(ctx, e);
839 }
840
841 protected long calculateSize(Object obj) {
842 //logger.debug("Size: "+size);
843 return objectSizeEstimator.estimateSize(obj);
844 }
845
846 @Override
847 public String toString() {
848 StringBuilder builder = new StringBuilder(290)
849 .append("TrafficShaping with Write Limit: ").append(writeLimit)
850 .append(" Read Limit: ").append(readLimit)
851 .append(" CheckInterval: ").append(checkInterval)
852 .append(" maxDelay: ").append(maxWriteDelay)
853 .append(" maxSize: ").append(maxWriteSize)
854 .append(" and Counter: ");
855 if (trafficCounter != null) {
856 builder.append(trafficCounter);
857 } else {
858 builder.append("none");
859 }
860 return builder.toString();
861 }
862 }