1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.traffic;
17
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicBoolean;
20
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.channel.ChannelEvent;
23 import org.jboss.netty.channel.ChannelHandlerContext;
24 import org.jboss.netty.channel.ChannelState;
25 import org.jboss.netty.channel.ChannelStateEvent;
26 import org.jboss.netty.channel.MessageEvent;
27 import org.jboss.netty.channel.SimpleChannelHandler;
28 import org.jboss.netty.logging.InternalLogger;
29 import org.jboss.netty.logging.InternalLoggerFactory;
30 import org.jboss.netty.util.DefaultObjectSizeEstimator;
31 import org.jboss.netty.util.ExternalResourceReleasable;
32 import org.jboss.netty.util.ObjectSizeEstimator;
33 import org.jboss.netty.util.Timeout;
34 import org.jboss.netty.util.Timer;
35 import org.jboss.netty.util.TimerTask;
36
37 /**
38 * AbstractTrafficShapingHandler allows to limit the global bandwidth
39 * (see {@link GlobalTrafficShapingHandler}) or per session
40 * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
41 * It allows too to implement an almost real time monitoring of the bandwidth using
42 * the monitors from {@link TrafficCounter} that will call back every checkInterval
43 * the method doAccounting of this handler.<br>
44 * <br>
45 *
46 * An {@link ObjectSizeEstimator} can be passed at construction to specify what
47 * is the size of the object to be read or write accordingly to the type of
48 * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
49 *
50 * If you want for any particular reasons to stop the monitoring (accounting) or to change
51 * the read/write limit or the check interval, several methods allow that for you:<br>
52 * <ul>
53 * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
54 * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
55 * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
56 * <li></li>
57 * </ul>
58 */
59 public abstract class AbstractTrafficShapingHandler extends
60 SimpleChannelHandler implements ExternalResourceReleasable {
61 /**
62 * Internal logger
63 */
64 static InternalLogger logger = InternalLoggerFactory
65 .getInstance(AbstractTrafficShapingHandler.class);
66
67 /**
68 * Default delay between two checks: 1s
69 */
70 public static final long DEFAULT_CHECK_INTERVAL = 1000;
71
72 /**
73 * Default minimal time to wait
74 */
75 private static final long MINIMAL_WAIT = 10;
76
77 /**
78 * Traffic Counter
79 */
80 protected TrafficCounter trafficCounter;
81
82 /**
83 * ObjectSizeEstimator
84 */
85 private ObjectSizeEstimator objectSizeEstimator;
86
87 /**
88 * Timer to associated to any TrafficCounter
89 */
90 protected Timer timer;
91
92 /**
93 * used in releaseExternalResources() to cancel the timer
94 */
95 private volatile Timeout timeout;
96
97 /**
98 * Limit in B/s to apply to write
99 */
100 private long writeLimit;
101
102 /**
103 * Limit in B/s to apply to read
104 */
105 private long readLimit;
106
107 /**
108 * Delay between two performance snapshots
109 */
110 protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
111
112 /**
113 * Boolean associated with the release of this TrafficShapingHandler.
114 * It will be true only once when the releaseExternalRessources is called
115 * to prevent waiting when shutdown.
116 */
117 final AtomicBoolean release = new AtomicBoolean(false);
118
119 private void init(ObjectSizeEstimator newObjectSizeEstimator,
120 Timer newTimer, long newWriteLimit, long newReadLimit,
121 long newCheckInterval) {
122 objectSizeEstimator = newObjectSizeEstimator;
123 timer = newTimer;
124 writeLimit = newWriteLimit;
125 readLimit = newReadLimit;
126 checkInterval = newCheckInterval;
127 //logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
128 }
129
130 /**
131 *
132 * @param newTrafficCounter the TrafficCounter to set
133 */
134 void setTrafficCounter(TrafficCounter newTrafficCounter) {
135 trafficCounter = newTrafficCounter;
136 }
137
138 /**
139 * Constructor using default {@link ObjectSizeEstimator}
140 *
141 * @param timer
142 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
143 * @param writeLimit
144 * 0 or a limit in bytes/s
145 * @param readLimit
146 * 0 or a limit in bytes/s
147 * @param checkInterval
148 * The delay between two computations of performances for
149 * channels or 0 if no stats are to be computed
150 */
151 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
152 long readLimit, long checkInterval) {
153 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval);
154 }
155
156 /**
157 * Constructor using the specified ObjectSizeEstimator
158 *
159 * @param objectSizeEstimator
160 * the {@link ObjectSizeEstimator} that will be used to compute
161 * the size of the message
162 * @param timer
163 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
164 * @param writeLimit
165 * 0 or a limit in bytes/s
166 * @param readLimit
167 * 0 or a limit in bytes/s
168 * @param checkInterval
169 * The delay between two computations of performances for
170 * channels or 0 if no stats are to be computed
171 */
172 protected AbstractTrafficShapingHandler(
173 ObjectSizeEstimator objectSizeEstimator, Timer timer,
174 long writeLimit, long readLimit, long checkInterval) {
175 init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval);
176 }
177
178 /**
179 * Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
180 *
181 * @param timer
182 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
183 * @param writeLimit
184 * 0 or a limit in bytes/s
185 * @param readLimit
186 * 0 or a limit in bytes/s
187 */
188 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
189 long readLimit) {
190 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
191 }
192
193 /**
194 * Constructor using the specified ObjectSizeEstimator and using default Check Interval
195 *
196 * @param objectSizeEstimator
197 * the {@link ObjectSizeEstimator} that will be used to compute
198 * the size of the message
199 * @param timer
200 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
201 * @param writeLimit
202 * 0 or a limit in bytes/s
203 * @param readLimit
204 * 0 or a limit in bytes/s
205 */
206 protected AbstractTrafficShapingHandler(
207 ObjectSizeEstimator objectSizeEstimator, Timer timer,
208 long writeLimit, long readLimit) {
209 init(objectSizeEstimator, timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
210 }
211
212 /**
213 * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
214 *
215 * @param timer
216 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
217 */
218 protected AbstractTrafficShapingHandler(Timer timer) {
219 init(new DefaultObjectSizeEstimator(), timer, 0, 0, DEFAULT_CHECK_INTERVAL);
220 }
221
222 /**
223 * Constructor using the specified ObjectSizeEstimator and using NO LIMIT and default Check Interval
224 *
225 * @param objectSizeEstimator
226 * the {@link ObjectSizeEstimator} that will be used to compute
227 * the size of the message
228 * @param timer
229 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
230 */
231 protected AbstractTrafficShapingHandler(
232 ObjectSizeEstimator objectSizeEstimator, Timer timer) {
233 init(objectSizeEstimator, timer, 0, 0, DEFAULT_CHECK_INTERVAL);
234 }
235
236 /**
237 * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
238 *
239 * @param timer
240 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
241 * @param checkInterval
242 * The delay between two computations of performances for
243 * channels or 0 if no stats are to be computed
244 */
245 protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
246 init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval);
247 }
248
249 /**
250 * Constructor using the specified ObjectSizeEstimator and using NO LIMIT
251 *
252 * @param objectSizeEstimator
253 * the {@link ObjectSizeEstimator} that will be used to compute
254 * the size of the message
255 * @param timer
256 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
257 * @param checkInterval
258 * The delay between two computations of performances for
259 * channels or 0 if no stats are to be computed
260 */
261 protected AbstractTrafficShapingHandler(
262 ObjectSizeEstimator objectSizeEstimator, Timer timer,
263 long checkInterval) {
264 init(objectSizeEstimator, timer, 0, 0, checkInterval);
265 }
266
267 /**
268 * Change the underlying limitations and check interval.
269 *
270 * @param newWriteLimit
271 * @param newReadLimit
272 * @param newCheckInterval
273 */
274 public void configure(long newWriteLimit, long newReadLimit,
275 long newCheckInterval) {
276 configure(newWriteLimit, newReadLimit);
277 configure(newCheckInterval);
278 }
279
280 /**
281 * Change the underlying limitations.
282 *
283 * @param newWriteLimit
284 * @param newReadLimit
285 */
286 public void configure(long newWriteLimit, long newReadLimit) {
287 writeLimit = newWriteLimit;
288 readLimit = newReadLimit;
289 if (trafficCounter != null) {
290 trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
291 }
292 }
293
294 /**
295 * Change the check interval.
296 *
297 * @param newCheckInterval
298 */
299 public void configure(long newCheckInterval) {
300 checkInterval = newCheckInterval;
301 if (trafficCounter != null) {
302 trafficCounter.configure(checkInterval);
303 }
304 }
305
306 /**
307 * Called each time the accounting is computed from the TrafficCounters.
308 * This method could be used for instance to implement almost real time accounting.
309 *
310 * @param counter
311 * the TrafficCounter that computes its performance
312 */
313 protected void doAccounting(TrafficCounter counter) {
314 // NOOP by default
315 }
316
317 /**
318 * Class to implement setReadable at fix time
319 */
320 private class ReopenReadTimerTask implements TimerTask {
321 ChannelHandlerContext ctx;
322 ReopenReadTimerTask(ChannelHandlerContext ctx) {
323 this.ctx = ctx;
324 }
325 public void run(Timeout timeoutArg) throws Exception {
326 //logger.warn("Start RRTT: "+release.get());
327 if (release.get()) {
328 return;
329 }
330 /*
331 logger.warn("WAKEUP! "+
332 (ctx != null && ctx.getChannel() != null &&
333 ctx.getChannel().isConnected()));
334 */
335 if (ctx != null && ctx.getChannel() != null &&
336 ctx.getChannel().isConnected()) {
337 //logger.warn(" setReadable TRUE: ");
338 // readSuspended = false;
339 ctx.setAttachment(null);
340 ctx.getChannel().setReadable(true);
341 }
342 }
343 }
344
345 /**
346 *
347 * @return the time that should be necessary to wait to respect limit. Can
348 * be negative time
349 */
350 private static long getTimeToWait(long limit, long bytes, long lastTime,
351 long curtime) {
352 long interval = curtime - lastTime;
353 if (interval == 0) {
354 // Time is too short, so just lets continue
355 return 0;
356 }
357 return (bytes * 1000 / limit - interval) / 10 * 10;
358 }
359
360 @Override
361 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
362 throws Exception {
363 try {
364 long curtime = System.currentTimeMillis();
365 long size = objectSizeEstimator.estimateSize(e.getMessage());
366 if (trafficCounter != null) {
367 trafficCounter.bytesRecvFlowControl(ctx, size);
368 if (readLimit == 0) {
369 // no action
370 return;
371 }
372 // compute the number of ms to wait before reopening the channel
373 long wait = getTimeToWait(readLimit,
374 trafficCounter.getCurrentReadBytes(),
375 trafficCounter.getLastTime(), curtime);
376 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
377 // time in order to
378 Channel channel = ctx.getChannel();
379 // try to limit the traffic
380 if (channel != null && channel.isConnected()) {
381 // Channel version
382 if (timer == null) {
383 // Sleep since no executor
384 // logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
385 if (release.get()) {
386 return;
387 }
388 Thread.sleep(wait);
389 return;
390 }
391 if (ctx.getAttachment() == null) {
392 // readSuspended = true;
393 ctx.setAttachment(Boolean.TRUE);
394 channel.setReadable(false);
395 // logger.warn("Read will wakeup after "+wait+" ms "+this);
396 TimerTask timerTask = new ReopenReadTimerTask(ctx);
397 timeout = timer.newTimeout(timerTask, wait,
398 TimeUnit.MILLISECONDS);
399 } else {
400 // should be waiting: but can occurs sometime so as
401 // a FIX
402 // logger.warn("Read sleep ok but should not be here: "+wait+" "+this);
403 if (release.get()) {
404 return;
405 }
406 Thread.sleep(wait);
407 }
408 } else {
409 // Not connected or no channel
410 // logger.warn("Read sleep "+wait+" ms for "+this);
411 if (release.get()) {
412 return;
413 }
414 Thread.sleep(wait);
415 }
416 }
417 }
418 } finally {
419 // The message is then just passed to the next handler
420 super.messageReceived(ctx, e);
421 }
422 }
423
424 @Override
425 public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
426 throws Exception {
427 try {
428 long curtime = System.currentTimeMillis();
429 long size = objectSizeEstimator.estimateSize(e.getMessage());
430 if (trafficCounter != null) {
431 trafficCounter.bytesWriteFlowControl(size);
432 if (writeLimit == 0) {
433 return;
434 }
435 // compute the number of ms to wait before continue with the
436 // channel
437 long wait = getTimeToWait(writeLimit,
438 trafficCounter.getCurrentWrittenBytes(),
439 trafficCounter.getLastTime(), curtime);
440 if (wait >= MINIMAL_WAIT) {
441 // Global or Channel
442 if (release.get()) {
443 return;
444 }
445 Thread.sleep(wait);
446 }
447 }
448 } finally {
449 // The message is then just passed to the next handler
450 super.writeRequested(ctx, e);
451 }
452 }
453 @Override
454 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
455 throws Exception {
456 if (e instanceof ChannelStateEvent) {
457 ChannelStateEvent cse = (ChannelStateEvent) e;
458 if (cse.getState() == ChannelState.INTEREST_OPS &&
459 (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
460
461 // setReadable(true) requested
462 boolean readSuspended = ctx.getAttachment() != null;
463 if (readSuspended) {
464 // Drop the request silently if this handler has
465 // set the flag.
466 e.getFuture().setSuccess();
467 return;
468 }
469 }
470 }
471 super.handleDownstream(ctx, e);
472 }
473
474 /**
475 *
476 * @return the current TrafficCounter (if
477 * channel is still connected)
478 */
479 public TrafficCounter getTrafficCounter() {
480 return trafficCounter;
481 }
482
483 public void releaseExternalResources() {
484 if (trafficCounter != null) {
485 trafficCounter.stop();
486 }
487 release.set(true);
488 if (timeout != null) {
489 timeout.cancel();
490 }
491 timer.stop();
492 }
493
494 @Override
495 public String toString() {
496 return "TrafficShaping with Write Limit: " + writeLimit +
497 " Read Limit: " + readLimit + " and Counter: " +
498 (trafficCounter != null? trafficCounter.toString() : "none");
499 }
500 }