1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.traffic;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelEvent;
20 import org.jboss.netty.channel.ChannelHandlerContext;
21 import org.jboss.netty.channel.ChannelState;
22 import org.jboss.netty.channel.ChannelStateEvent;
23 import org.jboss.netty.channel.MessageEvent;
24 import org.jboss.netty.channel.SimpleChannelHandler;
25 import org.jboss.netty.logging.InternalLogger;
26 import org.jboss.netty.logging.InternalLoggerFactory;
27 import org.jboss.netty.util.DefaultObjectSizeEstimator;
28 import org.jboss.netty.util.ExternalResourceReleasable;
29 import org.jboss.netty.util.ObjectSizeEstimator;
30 import org.jboss.netty.util.Timeout;
31 import org.jboss.netty.util.Timer;
32 import org.jboss.netty.util.TimerTask;
33
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicBoolean;
36
37 /**
38 * AbstractTrafficShapingHandler allows to limit the global bandwidth
39 * (see {@link GlobalTrafficShapingHandler}) or per session
40 * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
41 * It allows too to implement an almost real time monitoring of the bandwidth using
42 * the monitors from {@link TrafficCounter} that will call back every checkInterval
43 * the method doAccounting of this handler.<br>
44 * <br>
45 *
46 * An {@link ObjectSizeEstimator} can be passed at construction to specify what
47 * is the size of the object to be read or write accordingly to the type of
48 * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
49 *
50 * If you want for any particular reasons to stop the monitoring (accounting) or to change
51 * the read/write limit or the check interval, several methods allow that for you:<br>
52 * <ul>
53 * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
54 * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
55 * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
56 * <li></li>
57 * </ul>
58 */
59 public abstract class AbstractTrafficShapingHandler extends
60 SimpleChannelHandler implements ExternalResourceReleasable {
61 /**
62 * Internal logger
63 */
64 static InternalLogger logger = InternalLoggerFactory
65 .getInstance(AbstractTrafficShapingHandler.class);
66
67 /**
68 * Default delay between two checks: 1s
69 */
70 public static final long DEFAULT_CHECK_INTERVAL = 1000;
71
72 /**
73 * Default minimal time to wait
74 */
75 private static final long MINIMAL_WAIT = 10;
76
77 /**
78 * Traffic Counter
79 */
80 protected TrafficCounter trafficCounter;
81
82 /**
83 * ObjectSizeEstimator
84 */
85 private ObjectSizeEstimator objectSizeEstimator;
86
87 /**
88 * Timer to associated to any TrafficCounter
89 */
90 protected Timer timer;
91
92 /**
93 * used in releaseExternalResources() to cancel the timer
94 */
95 private volatile Timeout timeout;
96
97 /**
98 * Limit in B/s to apply to write
99 */
100 private long writeLimit;
101
102 /**
103 * Limit in B/s to apply to read
104 */
105 private long readLimit;
106
107 /**
108 * Delay between two performance snapshots
109 */
110 protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
111
112 /**
113 * Boolean associated with the release of this TrafficShapingHandler.
114 * It will be true only once when the releaseExternalRessources is called
115 * to prevent waiting when shutdown.
116 */
117 final AtomicBoolean release = new AtomicBoolean(false);
118
119 private void init(ObjectSizeEstimator newObjectSizeEstimator,
120 Timer newTimer, long newWriteLimit, long newReadLimit,
121 long newCheckInterval) {
122 objectSizeEstimator = newObjectSizeEstimator;
123 timer = newTimer;
124 writeLimit = newWriteLimit;
125 readLimit = newReadLimit;
126 checkInterval = newCheckInterval;
127 //logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
128 }
129
130 /**
131 *
132 * @param newTrafficCounter the TrafficCounter to set
133 */
134 void setTrafficCounter(TrafficCounter newTrafficCounter) {
135 trafficCounter = newTrafficCounter;
136 }
137
138 /**
139 * Constructor using default {@link ObjectSizeEstimator}
140 *
141 * @param timer
142 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
143 * @param writeLimit
144 * 0 or a limit in bytes/s
145 * @param readLimit
146 * 0 or a limit in bytes/s
147 * @param checkInterval
148 * The delay between two computations of performances for
149 * channels or 0 if no stats are to be computed
150 */
151 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
152 long readLimit, long checkInterval) {
153 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval);
154 }
155
156 /**
157 * Constructor using the specified ObjectSizeEstimator
158 *
159 * @param objectSizeEstimator
160 * the {@link ObjectSizeEstimator} that will be used to compute
161 * the size of the message
162 * @param timer
163 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
164 * @param writeLimit
165 * 0 or a limit in bytes/s
166 * @param readLimit
167 * 0 or a limit in bytes/s
168 * @param checkInterval
169 * The delay between two computations of performances for
170 * channels or 0 if no stats are to be computed
171 */
172 protected AbstractTrafficShapingHandler(
173 ObjectSizeEstimator objectSizeEstimator, Timer timer,
174 long writeLimit, long readLimit, long checkInterval) {
175 init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval);
176 }
177
178 /**
179 * Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
180 *
181 * @param timer
182 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
183 * @param writeLimit
184 * 0 or a limit in bytes/s
185 * @param readLimit
186 * 0 or a limit in bytes/s
187 */
188 protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
189 long readLimit) {
190 init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
191 }
192
193 /**
194 * Constructor using the specified ObjectSizeEstimator and using default Check Interval
195 *
196 * @param objectSizeEstimator
197 * the {@link ObjectSizeEstimator} that will be used to compute
198 * the size of the message
199 * @param timer
200 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
201 * @param writeLimit
202 * 0 or a limit in bytes/s
203 * @param readLimit
204 * 0 or a limit in bytes/s
205 */
206 protected AbstractTrafficShapingHandler(
207 ObjectSizeEstimator objectSizeEstimator, Timer timer,
208 long writeLimit, long readLimit) {
209 init(objectSizeEstimator, timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
210 }
211
212 /**
213 * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
214 *
215 * @param timer
216 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
217 */
218 protected AbstractTrafficShapingHandler(Timer timer) {
219 init(new DefaultObjectSizeEstimator(), timer, 0, 0, DEFAULT_CHECK_INTERVAL);
220 }
221
222 /**
223 * Constructor using the specified ObjectSizeEstimator and using NO LIMIT and default Check Interval
224 *
225 * @param objectSizeEstimator
226 * the {@link ObjectSizeEstimator} that will be used to compute
227 * the size of the message
228 * @param timer
229 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
230 */
231 protected AbstractTrafficShapingHandler(
232 ObjectSizeEstimator objectSizeEstimator, Timer timer) {
233 init(objectSizeEstimator, timer, 0, 0, DEFAULT_CHECK_INTERVAL);
234 }
235
236 /**
237 * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
238 *
239 * @param timer
240 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
241 * @param checkInterval
242 * The delay between two computations of performances for
243 * channels or 0 if no stats are to be computed
244 */
245 protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
246 init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval);
247 }
248
249 /**
250 * Constructor using the specified ObjectSizeEstimator and using NO LIMIT
251 *
252 * @param objectSizeEstimator
253 * the {@link ObjectSizeEstimator} that will be used to compute
254 * the size of the message
255 * @param timer
256 * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
257 * @param checkInterval
258 * The delay between two computations of performances for
259 * channels or 0 if no stats are to be computed
260 */
261 protected AbstractTrafficShapingHandler(
262 ObjectSizeEstimator objectSizeEstimator, Timer timer,
263 long checkInterval) {
264 init(objectSizeEstimator, timer, 0, 0, checkInterval);
265 }
266
267 /**
268 * Change the underlying limitations and check interval.
269 */
270 public void configure(long newWriteLimit, long newReadLimit,
271 long newCheckInterval) {
272 configure(newWriteLimit, newReadLimit);
273 configure(newCheckInterval);
274 }
275
276 /**
277 * Change the underlying limitations.
278 */
279 public void configure(long newWriteLimit, long newReadLimit) {
280 writeLimit = newWriteLimit;
281 readLimit = newReadLimit;
282 if (trafficCounter != null) {
283 trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
284 }
285 }
286
287 /**
288 * Change the check interval.
289 */
290 public void configure(long newCheckInterval) {
291 checkInterval = newCheckInterval;
292 if (trafficCounter != null) {
293 trafficCounter.configure(checkInterval);
294 }
295 }
296
297 /**
298 * Called each time the accounting is computed from the TrafficCounters.
299 * This method could be used for instance to implement almost real time accounting.
300 *
301 * @param counter
302 * the TrafficCounter that computes its performance
303 */
304 protected void doAccounting(TrafficCounter counter) {
305 // NOOP by default
306 }
307
308 /**
309 * Class to implement setReadable at fix time
310 */
311 private class ReopenReadTimerTask implements TimerTask {
312 final ChannelHandlerContext ctx;
313 ReopenReadTimerTask(ChannelHandlerContext ctx) {
314 this.ctx = ctx;
315 }
316 public void run(Timeout timeoutArg) throws Exception {
317 //logger.warn("Start RRTT: "+release.get());
318 if (release.get()) {
319 return;
320 }
321 /*
322 logger.warn("WAKEUP! "+
323 (ctx != null && ctx.getChannel() != null &&
324 ctx.getChannel().isConnected()));
325 */
326 if (ctx != null && ctx.getChannel() != null &&
327 ctx.getChannel().isConnected()) {
328 //logger.warn(" setReadable TRUE: ");
329 // readSuspended = false;
330 ctx.setAttachment(null);
331 ctx.getChannel().setReadable(true);
332 }
333 }
334 }
335
336 /**
337 * @return the time that should be necessary to wait to respect limit. Can be negative time
338 */
339 private static long getTimeToWait(long limit, long bytes, long lastTime, long curtime) {
340 long interval = curtime - lastTime;
341 if (interval <= 0) {
342 // Time is too short, so just lets continue
343 return 0;
344 }
345 return (bytes * 1000 / limit - interval) / 10 * 10;
346 }
347
348 @Override
349 public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
350 throws Exception {
351 try {
352 long curtime = System.currentTimeMillis();
353 long size = objectSizeEstimator.estimateSize(evt.getMessage());
354 if (trafficCounter != null) {
355 trafficCounter.bytesRecvFlowControl(size);
356 if (readLimit == 0) {
357 // no action
358 return;
359 }
360 // compute the number of ms to wait before reopening the channel
361 long wait = getTimeToWait(readLimit,
362 trafficCounter.getCurrentReadBytes(),
363 trafficCounter.getLastTime(), curtime);
364 if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
365 // time in order to
366 Channel channel = ctx.getChannel();
367 // try to limit the traffic
368 if (channel != null && channel.isConnected()) {
369 // Channel version
370 if (timer == null) {
371 // Sleep since no executor
372 // logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
373 if (release.get()) {
374 return;
375 }
376 Thread.sleep(wait);
377 return;
378 }
379 if (ctx.getAttachment() == null) {
380 // readSuspended = true;
381 ctx.setAttachment(Boolean.TRUE);
382 channel.setReadable(false);
383 // logger.warn("Read will wakeup after "+wait+" ms "+this);
384 TimerTask timerTask = new ReopenReadTimerTask(ctx);
385 timeout = timer.newTimeout(timerTask, wait,
386 TimeUnit.MILLISECONDS);
387 } else {
388 // should be waiting: but can occurs sometime so as
389 // a FIX
390 // logger.warn("Read sleep ok but should not be here: "+wait+" "+this);
391 if (release.get()) {
392 return;
393 }
394 Thread.sleep(wait);
395 }
396 } else {
397 // Not connected or no channel
398 // logger.warn("Read sleep "+wait+" ms for "+this);
399 if (release.get()) {
400 return;
401 }
402 Thread.sleep(wait);
403 }
404 }
405 }
406 } finally {
407 // The message is then just passed to the next handler
408 super.messageReceived(ctx, evt);
409 }
410 }
411
412 @Override
413 public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
414 throws Exception {
415 try {
416 long curtime = System.currentTimeMillis();
417 long size = objectSizeEstimator.estimateSize(evt.getMessage());
418 if (trafficCounter != null) {
419 trafficCounter.bytesWriteFlowControl(size);
420 if (writeLimit == 0) {
421 return;
422 }
423 // compute the number of ms to wait before continue with the
424 // channel
425 long wait = getTimeToWait(writeLimit,
426 trafficCounter.getCurrentWrittenBytes(),
427 trafficCounter.getLastTime(), curtime);
428 if (wait >= MINIMAL_WAIT) {
429 // Global or Channel
430 if (release.get()) {
431 return;
432 }
433 Thread.sleep(wait);
434 }
435 }
436 } finally {
437 // The message is then just passed to the next handler
438 super.writeRequested(ctx, evt);
439 }
440 }
441 @Override
442 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
443 throws Exception {
444 if (e instanceof ChannelStateEvent) {
445 ChannelStateEvent cse = (ChannelStateEvent) e;
446 if (cse.getState() == ChannelState.INTEREST_OPS &&
447 (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
448
449 // setReadable(true) requested
450 boolean readSuspended = ctx.getAttachment() != null;
451 if (readSuspended) {
452 // Drop the request silently if this handler has
453 // set the flag.
454 e.getFuture().setSuccess();
455 return;
456 }
457 }
458 }
459 super.handleDownstream(ctx, e);
460 }
461
462 /**
463 *
464 * @return the current TrafficCounter (if
465 * channel is still connected)
466 */
467 public TrafficCounter getTrafficCounter() {
468 return trafficCounter;
469 }
470
471 public void releaseExternalResources() {
472 if (trafficCounter != null) {
473 trafficCounter.stop();
474 }
475 release.set(true);
476 if (timeout != null) {
477 timeout.cancel();
478 }
479 timer.stop();
480 }
481
482 @Override
483 public String toString() {
484 return "TrafficShaping with Write Limit: " + writeLimit +
485 " Read Limit: " + readLimit + " and Counter: " +
486 (trafficCounter != null? trafficCounter.toString() : "none");
487 }
488 }