1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.traffic;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelEvent;
20 import org.jboss.netty.channel.ChannelHandlerContext;
21 import org.jboss.netty.channel.ChannelState;
22 import org.jboss.netty.channel.ChannelStateEvent;
23 import org.jboss.netty.channel.MessageEvent;
24 import org.jboss.netty.channel.SimpleChannelHandler;
25 import org.jboss.netty.logging.InternalLogger;
26 import org.jboss.netty.logging.InternalLoggerFactory;
27 import org.jboss.netty.util.DefaultObjectSizeEstimator;
28 import org.jboss.netty.util.ExternalResourceReleasable;
29 import org.jboss.netty.util.ObjectSizeEstimator;
30 import org.jboss.netty.util.Timeout;
31 import org.jboss.netty.util.Timer;
32 import org.jboss.netty.util.TimerTask;
33
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicBoolean;
36
37 /**
38 * AbstractTrafficShapingHandler allows to limit the global bandwidth
39 * (see {@link GlobalTrafficShapingHandler}) or per session
40 * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
41 * It allows too to implement an almost real time monitoring of the bandwidth using
42 * the monitors from {@link TrafficCounter} that will call back every checkInterval
43 * the method doAccounting of this handler.<br>
44 * <br>
45 *
46 * An {@link ObjectSizeEstimator} can be passed at construction to specify what
47 * is the size of the object to be read or write accordingly to the type of
48 * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
49 *
50 * If you want for any particular reasons to stop the monitoring (accounting) or to change
51 * the read/write limit or the check interval, several methods allow that for you:<br>
52 * <ul>
53 * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
54 * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
55 * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
56 * <li></li>
57 * </ul>
58 */
59 public abstract class AbstractTrafficShapingHandler extends
60 SimpleChannelHandler implements ExternalResourceReleasable {
61 /**
62 * Internal logger
63 */
64 static InternalLogger logger = InternalLoggerFactory
65 .getInstance(AbstractTrafficShapingHandler.class);
66
67 /**
68 * Default delay between two checks: 1s
69 */
70 public static final long DEFAULT_CHECK_INTERVAL = 1000;
71 /**
72 * Default max delay in case of traffic shaping
73 * (during which no communication will occur).
74 * Shall be less than TIMEOUT. Here half of "standard" 30s
75 */
76 public static final long DEFAULT_MAX_TIME = 15000;
77
78 /**
79 * Default minimal time to wait
80 */
81 static final long MINIMAL_WAIT = 10;
82
83 /**
84 * Traffic Counter
85 */
86 protected TrafficCounter trafficCounter;
87
88 /**
89 * ObjectSizeEstimator
90 */
91 private ObjectSizeEstimator objectSizeEstimator;
92
93 /**
94 * Timer associated to any TrafficCounter
95 */
96 protected Timer timer;
97
98 /**
99 * used in releaseExternalResources() to cancel the timer
100 */
101 private volatile Timeout timeout;
102
103 /**
104 * Limit in B/s to apply to write
105 */
106 private long writeLimit;
107
108 /**
109 * Limit in B/s to apply to read
110 */
111 private long readLimit;
112
113 /**
114 * Delay between two performance snapshots
115 */
116 protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
117 /**
118 * Max delay in wait
119 */
120 protected long maxTime = DEFAULT_MAX_TIME; // default 15 s
121
122 /**
123 * Boolean associated with the release of this TrafficShapingHandler.
124 * It will be true only once when the releaseExternalRessources is called
125 * to prevent waiting when shutdown.
126 */
127 final AtomicBoolean release = new AtomicBoolean(false);
128
129 private void init(ObjectSizeEstimator newObjectSizeEstimator,
130 Timer newTimer, long newWriteLimit, long newReadLimit,
131 long newCheckInterval, long newMaxTime) {
132 objectSizeEstimator = newObjectSizeEstimator;
133 timer = newTimer;
134 writeLimit = newWriteLimit;
135 readLimit = newReadLimit;
136 checkInterval = newCheckInterval;
137 maxTime = newMaxTime;
138 //logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
139 }
140
141 /**
142 *
143 * @param newTrafficCounter the TrafficCounter to set
144 */
145 void setTrafficCounter(TrafficCounter newTrafficCounter) {
146 trafficCounter = newTrafficCounter;
147 }
148
149 /**
150 * Constructor using default {@link ObjectSizeEstimator}
151 *
152 * @param timer
153 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
154 * @param writeLimit
155 * 0 or a limit in bytes/s
156 * @param readLimit
157 * 0 or a limit in bytes/s
158 * @param checkInterval
159 * The delay between two computations of performances for
160 * channels or 0 if no stats are to be computed
161 */
162 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
163 long readLimit, long checkInterval) {
164 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
165 DEFAULT_MAX_TIME);
166 }
167
168 /**
169 * Constructor using the specified ObjectSizeEstimator
170 *
171 * @param objectSizeEstimator
172 * the {@link ObjectSizeEstimator} that will be used to compute
173 * the size of the message
174 * @param timer
175 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
176 * @param writeLimit
177 * 0 or a limit in bytes/s
178 * @param readLimit
179 * 0 or a limit in bytes/s
180 * @param checkInterval
181 * The delay between two computations of performances for
182 * channels or 0 if no stats are to be computed
183 */
184 protected AbstractTrafficShapingHandler(
185 ObjectSizeEstimator objectSizeEstimator, Timer timer,
186 long writeLimit, long readLimit, long checkInterval) {
187 init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
188 }
189
190 /**
191 * Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
192 *
193 * @param timer
194 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
195 * @param writeLimit
196 * 0 or a limit in bytes/s
197 * @param readLimit
198 * 0 or a limit in bytes/s
199 */
200 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
201 long readLimit) {
202 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit,
203 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
204 }
205
206 /**
207 * Constructor using the specified ObjectSizeEstimator and using default Check Interval
208 *
209 * @param objectSizeEstimator
210 * the {@link ObjectSizeEstimator} that will be used to compute
211 * the size of the message
212 * @param timer
213 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
214 * @param writeLimit
215 * 0 or a limit in bytes/s
216 * @param readLimit
217 * 0 or a limit in bytes/s
218 */
219 protected AbstractTrafficShapingHandler(
220 ObjectSizeEstimator objectSizeEstimator, Timer timer,
221 long writeLimit, long readLimit) {
222 init(objectSizeEstimator, timer, writeLimit, readLimit,
223 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
224 }
225
226 /**
227 * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
228 *
229 * @param timer
230 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
231 */
232 protected AbstractTrafficShapingHandler(Timer timer) {
233 init(new DefaultObjectSizeEstimator(), timer, 0, 0,
234 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
235 }
236
237 /**
238 * Constructor using the specified ObjectSizeEstimator and using NO LIMIT and default Check Interval
239 *
240 * @param objectSizeEstimator
241 * the {@link ObjectSizeEstimator} that will be used to compute
242 * the size of the message
243 * @param timer
244 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
245 */
246 protected AbstractTrafficShapingHandler(
247 ObjectSizeEstimator objectSizeEstimator, Timer timer) {
248 init(objectSizeEstimator, timer, 0, 0,
249 DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
250 }
251
252 /**
253 * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
254 *
255 * @param timer
256 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
257 * @param checkInterval
258 * The delay between two computations of performances for
259 * channels or 0 if no stats are to be computed
260 */
261 protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
262 init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
263 }
264
265 /**
266 * Constructor using the specified ObjectSizeEstimator and using NO LIMIT
267 *
268 * @param objectSizeEstimator
269 * the {@link ObjectSizeEstimator} that will be used to compute
270 * the size of the message
271 * @param timer
272 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
273 * @param checkInterval
274 * The delay between two computations of performances for
275 * channels or 0 if no stats are to be computed
276 */
277 protected AbstractTrafficShapingHandler(
278 ObjectSizeEstimator objectSizeEstimator, Timer timer,
279 long checkInterval) {
280 init(objectSizeEstimator, timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
281 }
282
283 /**
284 * Constructor using default {@link ObjectSizeEstimator}
285 *
286 * @param timer
287 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
288 * @param writeLimit
289 * 0 or a limit in bytes/s
290 * @param readLimit
291 * 0 or a limit in bytes/s
292 * @param checkInterval
293 * The delay between two computations of performances for
294 * channels or 0 if no stats are to be computed
295 * @param maxTime
296 * The max time to wait in case of excess of traffic (to prevent Time Out event)
297 */
298 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
299 long readLimit, long checkInterval, long maxTime) {
300 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
301 maxTime);
302 }
303
304 /**
305 * Constructor using the specified ObjectSizeEstimator
306 *
307 * @param objectSizeEstimator
308 * the {@link ObjectSizeEstimator} that will be used to compute
309 * the size of the message
310 * @param timer
311 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
312 * @param writeLimit
313 * 0 or a limit in bytes/s
314 * @param readLimit
315 * 0 or a limit in bytes/s
316 * @param checkInterval
317 * The delay between two computations of performances for
318 * channels or 0 if no stats are to be computed
319 * @param maxTime
320 * The max time to wait in case of excess of traffic (to prevent Time Out event)
321 */
322 protected AbstractTrafficShapingHandler(
323 ObjectSizeEstimator objectSizeEstimator, Timer timer,
324 long writeLimit, long readLimit, long checkInterval, long maxTime) {
325 init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, maxTime);
326 }
327
328 /**
329 * Change the underlying limitations and check interval.
330 */
331 public void configure(long newWriteLimit, long newReadLimit,
332 long newCheckInterval) {
333 configure(newWriteLimit, newReadLimit);
334 configure(newCheckInterval);
335 }
336
337 /**
338 * Change the underlying limitations.
339 */
340 public void configure(long newWriteLimit, long newReadLimit) {
341 writeLimit = newWriteLimit;
342 readLimit = newReadLimit;
343 if (trafficCounter != null) {
344 trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
345 }
346 }
347
348 /**
349 * Change the check interval.
350 */
351 public void configure(long newCheckInterval) {
352 setCheckInterval(newCheckInterval);
353 }
354
355 /**
356 * @return the writeLimit
357 */
358 public long getWriteLimit() {
359 return writeLimit;
360 }
361
362 /**
363 * @param writeLimit the writeLimit to set
364 */
365 public void setWriteLimit(long writeLimit) {
366 this.writeLimit = writeLimit;
367 if (trafficCounter != null) {
368 trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
369 }
370 }
371
372 /**
373 * @return the readLimit
374 */
375 public long getReadLimit() {
376 return readLimit;
377 }
378
379 /**
380 * @param readLimit the readLimit to set
381 */
382 public void setReadLimit(long readLimit) {
383 this.readLimit = readLimit;
384 if (trafficCounter != null) {
385 trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
386 }
387 }
388
389 /**
390 * @return the checkInterval
391 */
392 public long getCheckInterval() {
393 return checkInterval;
394 }
395
396 /**
397 * @param newCheckInterval the checkInterval to set
398 */
399 public void setCheckInterval(long newCheckInterval) {
400 this.checkInterval = newCheckInterval;
401 if (trafficCounter != null) {
402 trafficCounter.configure(checkInterval);
403 }
404 }
405
406 /**
407 * @return the max delay on wait
408 */
409 public long getMaxTimeWait() {
410 return maxTime;
411 }
412
413 /**
414 *
415 * @param maxTime
416 * Max delay in wait, shall be less than TIME OUT in related protocol
417 */
418 public void setMaxTimeWait(long maxTime) {
419 this.maxTime = maxTime;
420 }
421
422 /**
423 * Called each time the accounting is computed from the TrafficCounters.
424 * This method could be used for instance to implement almost real time accounting.
425 *
426 * @param counter
427 * the TrafficCounter that computes its performance
428 */
429 protected void doAccounting(TrafficCounter counter) {
430 // NOOP by default
431 }
432
433 /**
434 * Class to implement setReadable at fix time
435 */
436 private class ReopenReadTimerTask implements TimerTask {
437 final ChannelHandlerContext ctx;
438 ReopenReadTimerTask(ChannelHandlerContext ctx) {
439 this.ctx = ctx;
440 }
441 public void run(Timeout timeoutArg) throws Exception {
442 //logger.warn("Start RRTT: "+release.get());
443 if (release.get()) {
444 return;
445 }
446 if (!ctx.getChannel().isReadable() && ctx.getAttachment() == null) {
447 // If isReadable is False and Active is True, user make a direct setReadable(false)
448 // Then Just reset the status
449 if (logger.isDebugEnabled()) {
450 logger.debug("Not Unsuspend: " + ctx.getChannel().isReadable() + ":" +
451 (ctx.getAttachment() == null));
452 }
453 ctx.setAttachment(null);
454 } else {
455 // Anything else allows the handler to reset the AutoRead
456 if (logger.isDebugEnabled()) {
457 if (ctx.getChannel().isReadable() && ctx.getAttachment() != null) {
458 logger.debug("Unsuspend: " + ctx.getChannel().isReadable() + ":" +
459 (ctx.getAttachment() == null));
460 } else {
461 logger.debug("Normal Unsuspend: " + ctx.getChannel().isReadable() + ":" +
462 (ctx.getAttachment() == null));
463 }
464 }
465 ctx.setAttachment(null);
466 ctx.getChannel().setReadable(true);
467 }
468 if (logger.isDebugEnabled()) {
469 logger.debug("Unsupsend final status => " + ctx.getChannel().isReadable() + ":" +
470 (ctx.getAttachment() == null));
471 }
472 }
473 }
474
475 @Override
476 public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
477 throws Exception {
478 try {
479 long size = objectSizeEstimator.estimateSize(evt.getMessage());
480 if (size > 0 && trafficCounter != null) {
481 // compute the number of ms to wait before reopening the channel
482 long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime);
483 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
484 // time in order to try to limit the traffic
485 if (release.get()) {
486 return;
487 }
488 Channel channel = ctx.getChannel();
489 if (channel != null && channel.isConnected()) {
490 // Only AutoRead AND HandlerActive True means Context Active
491 if (logger.isDebugEnabled()) {
492 logger.debug("Read Suspend: " + wait + ":" + channel.isReadable() + ":" +
493 (ctx.getAttachment() == null));
494 }
495 if (timer == null) {
496 // Sleep since no executor
497 // logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
498 Thread.sleep(wait);
499 return;
500 }
501 if (channel.isReadable() && ctx.getAttachment() == null) {
502 ctx.setAttachment(Boolean.TRUE);
503 channel.setReadable(false);
504 if (logger.isDebugEnabled()) {
505 logger.debug("Suspend final status => " + channel.isReadable() + ":" +
506 (ctx.getAttachment() == null));
507 }
508 // Create a Runnable to reactive the read if needed. If one was create before
509 // it will just be reused to limit object creation
510 TimerTask timerTask = new ReopenReadTimerTask(ctx);
511 timeout = timer.newTimeout(timerTask, wait,
512 TimeUnit.MILLISECONDS);
513 }
514 }
515 }
516 }
517 } finally {
518 // The message is then just passed to the next handler
519 super.messageReceived(ctx, evt);
520 }
521 }
522
523 @Override
524 public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
525 throws Exception {
526 long wait = 0;
527 try {
528 long size = objectSizeEstimator.estimateSize(evt.getMessage());
529 if (size > 0 && trafficCounter != null) {
530 // compute the number of ms to wait before continue with the channel
531 wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime);
532 if (logger.isDebugEnabled()) {
533 logger.debug("Write Suspend: " + wait + ":" + ctx.getChannel().isReadable() + ":" +
534 (ctx.getAttachment() == null));
535 }
536 if (wait >= MINIMAL_WAIT) {
537 if (release.get()) {
538 return;
539 }
540 /*
541 * Option 2:
542 * Thread.sleep(wait);
543 * System.out.println("Write unsuspended");
544 * Option 1: use an ordered list of messages to send
545 * Warning of memory pressure!
546 */
547 } else {
548 wait = 0;
549 }
550 }
551 } finally {
552 if (release.get()) {
553 return;
554 }
555 // The message is then just passed to the next handler
556 submitWrite(ctx, evt, wait);
557 }
558 }
559
560 protected void internalSubmitWrite(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
561 super.writeRequested(ctx, evt);
562 }
563
564 protected abstract void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long delay)
565 throws Exception;
566
567 @Override
568 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
569 throws Exception {
570 if (e instanceof ChannelStateEvent) {
571 ChannelStateEvent cse = (ChannelStateEvent) e;
572 if (cse.getState() == ChannelState.INTEREST_OPS &&
573 (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
574
575 // setReadable(true) requested
576 boolean readSuspended = ctx.getAttachment() != null;
577 if (readSuspended) {
578 // Drop the request silently if this handler has
579 // set the flag.
580 e.getFuture().setSuccess();
581 return;
582 }
583 }
584 }
585 super.handleDownstream(ctx, e);
586 }
587
588 /**
589 *
590 * @return the current TrafficCounter (if
591 * channel is still connected)
592 */
593 public TrafficCounter getTrafficCounter() {
594 return trafficCounter;
595 }
596
597 public void releaseExternalResources() {
598 if (trafficCounter != null) {
599 trafficCounter.stop();
600 }
601 release.set(true);
602 if (timeout != null) {
603 timeout.cancel();
604 }
605 //shall be done outside (since it can be shared): timer.stop();
606 }
607
608 @Override
609 public String toString() {
610 return "TrafficShaping with Write Limit: " + writeLimit +
611 " Read Limit: " + readLimit + " every: " + checkInterval + " and Counter: " +
612 (trafficCounter != null? trafficCounter.toString() : "none");
613 }
614 }