1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.traffic;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.channel.ChannelHandler.Sharable;
23 import io.netty.util.concurrent.EventExecutor;
24 import io.netty.util.internal.PlatformDependent;
25
26 import java.util.ArrayDeque;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 /**
33 * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global
34 * traffic shaping, that is to say a global limitation of the bandwidth, whatever
35 * the number of opened channels.</p>
36 * <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>2</b>.</p>
37 *
38 * <p>The general use should be as follow:</p>
39 * <ul>
40 * <li><p>Create your unique GlobalTrafficShapingHandler like:</p>
41 * <p><tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);</tt></p>
42 * <p>The executor could be the underlying IO worker pool</p>
43 * <p><tt>pipeline.addLast(myHandler);</tt></p>
44 *
45 * <p><b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
46 * and shared among all channels as the counter must be shared among all channels.</b></p>
47 *
48 * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
49 * or the check interval (in millisecond) that represents the delay between two computations of the
50 * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
51 *
52 * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
53 * it is recommended to set a positive value, even if it is high since the precision of the
54 * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
55 * the less precise the traffic shaping will be. It is suggested as higher value something close
56 * to 5 or 10 minutes.</p>
57 *
58 * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
59 * </li>
60 * <li>In your handler, you should consider to use the {@code channel.isWritable()} and
61 * {@code channelWritabilityChanged(ctx)} to handle writability, or through
62 * {@code future.addListener(new GenericFutureListener())} on the future returned by
63 * {@code ctx.write()}.</li>
64 * <li><p>You shall also consider to have object size in read or write operations relatively adapted to
65 * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
66 * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
67 * <li><p>Some configuration methods will be taken as best effort, meaning
68 * that all already scheduled traffics will not be
69 * changed, but only applied to new traffics.</p>
70 * So the expected usage of those methods are to be used not too often,
71 * accordingly to the traffic shaping configuration.</li>
72 * </ul>
73 *
74 * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources.
75 * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own.
76 */
77 @Sharable
78 public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
79 /**
80 * All queues per channel
81 */
82 private final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
83
84 /**
85 * Global queues size
86 */
87 private final AtomicLong queuesSize = new AtomicLong();
88
89 /**
90 * Max size in the list before proposing to stop writing new objects from next handlers
91 * for all channel (global)
92 */
93 long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
94
95 private static final class PerChannel {
96 ArrayDeque<ToSend> messagesQueue;
97 long queueSize;
98 long lastWriteTimestamp;
99 long lastReadTimestamp;
100 }
101
102 /**
103 * Create the global TrafficCounter.
104 */
105 void createGlobalTrafficCounter(ScheduledExecutorService executor) {
106 if (executor == null) {
107 throw new NullPointerException("executor");
108 }
109 TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC", checkInterval);
110 setTrafficCounter(tc);
111 tc.start();
112 }
113
114 @Override
115 protected int userDefinedWritabilityIndex() {
116 return AbstractTrafficShapingHandler.GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
117 }
118
119 /**
120 * Create a new instance.
121 *
122 * @param executor
123 * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
124 * @param writeLimit
125 * 0 or a limit in bytes/s
126 * @param readLimit
127 * 0 or a limit in bytes/s
128 * @param checkInterval
129 * The delay between two computations of performances for
130 * channels or 0 if no stats are to be computed.
131 * @param maxTime
132 * The maximum delay to wait in case of traffic excess.
133 */
134 public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit,
135 long checkInterval, long maxTime) {
136 super(writeLimit, readLimit, checkInterval, maxTime);
137 createGlobalTrafficCounter(executor);
138 }
139
140 /**
141 * Create a new instance using
142 * default max time as delay allowed value of 15000 ms.
143 *
144 * @param executor
145 * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
146 * @param writeLimit
147 * 0 or a limit in bytes/s
148 * @param readLimit
149 * 0 or a limit in bytes/s
150 * @param checkInterval
151 * The delay between two computations of performances for
152 * channels or 0 if no stats are to be computed.
153 */
154 public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit,
155 long readLimit, long checkInterval) {
156 super(writeLimit, readLimit, checkInterval);
157 createGlobalTrafficCounter(executor);
158 }
159
160 /**
161 * Create a new instance using default Check Interval value of 1000 ms and
162 * default max time as delay allowed value of 15000 ms.
163 *
164 * @param executor
165 * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
166 * @param writeLimit
167 * 0 or a limit in bytes/s
168 * @param readLimit
169 * 0 or a limit in bytes/s
170 */
171 public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit,
172 long readLimit) {
173 super(writeLimit, readLimit);
174 createGlobalTrafficCounter(executor);
175 }
176
177 /**
178 * Create a new instance using
179 * default max time as delay allowed value of 15000 ms and no limit.
180 *
181 * @param executor
182 * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
183 * @param checkInterval
184 * The delay between two computations of performances for
185 * channels or 0 if no stats are to be computed.
186 */
187 public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) {
188 super(checkInterval);
189 createGlobalTrafficCounter(executor);
190 }
191
192 /**
193 * Create a new instance using default Check Interval value of 1000 ms and
194 * default max time as delay allowed value of 15000 ms and no limit.
195 *
196 * @param executor
197 * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
198 */
199 public GlobalTrafficShapingHandler(EventExecutor executor) {
200 createGlobalTrafficCounter(executor);
201 }
202
203 /**
204 * @return the maxGlobalWriteSize default value being 400 MB.
205 */
206 public long getMaxGlobalWriteSize() {
207 return maxGlobalWriteSize;
208 }
209
210 /**
211 * Note the change will be taken as best effort, meaning
212 * that all already scheduled traffics will not be
213 * changed, but only applied to new traffics.<br>
214 * So the expected usage of this method is to be used not too often,
215 * accordingly to the traffic shaping configuration.
216 *
217 * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
218 * globally for all channels before write suspended is set,
219 * default value being 400 MB.
220 */
221 public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
222 this.maxGlobalWriteSize = maxGlobalWriteSize;
223 }
224
225 /**
226 * @return the global size of the buffers for all queues.
227 */
228 public long queuesSize() {
229 return queuesSize.get();
230 }
231
232 /**
233 * Release all internal resources of this instance.
234 */
235 public final void release() {
236 trafficCounter.stop();
237 }
238
239 private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
240 // ensure creation is limited to one thread per channel
241 Channel channel = ctx.channel();
242 Integer key = channel.hashCode();
243 PerChannel perChannel = channelQueues.get(key);
244 if (perChannel == null) {
245 perChannel = new PerChannel();
246 perChannel.messagesQueue = new ArrayDeque<ToSend>();
247 perChannel.queueSize = 0L;
248 perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
249 perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
250 channelQueues.put(key, perChannel);
251 }
252 return perChannel;
253 }
254
255 @Override
256 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
257 getOrSetPerChannel(ctx);
258 super.handlerAdded(ctx);
259 }
260
261 @Override
262 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
263 Channel channel = ctx.channel();
264 Integer key = channel.hashCode();
265 PerChannel perChannel = channelQueues.remove(key);
266 if (perChannel != null) {
267 // write operations need synchronization
268 synchronized (perChannel) {
269 if (channel.isActive()) {
270 for (ToSend toSend : perChannel.messagesQueue) {
271 long size = calculateSize(toSend.toSend);
272 trafficCounter.bytesRealWriteFlowControl(size);
273 perChannel.queueSize -= size;
274 queuesSize.addAndGet(-size);
275 ctx.write(toSend.toSend, toSend.promise);
276 }
277 } else {
278 queuesSize.addAndGet(-perChannel.queueSize);
279 for (ToSend toSend : perChannel.messagesQueue) {
280 if (toSend.toSend instanceof ByteBuf) {
281 ((ByteBuf) toSend.toSend).release();
282 }
283 }
284 }
285 perChannel.messagesQueue.clear();
286 }
287 }
288 releaseWriteSuspended(ctx);
289 releaseReadSuspended(ctx);
290 super.handlerRemoved(ctx);
291 }
292
293 @Override
294 long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
295 Integer key = ctx.channel().hashCode();
296 PerChannel perChannel = channelQueues.get(key);
297 if (perChannel != null) {
298 if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
299 wait = maxTime;
300 }
301 }
302 return wait;
303 }
304
305 @Override
306 void informReadOperation(final ChannelHandlerContext ctx, final long now) {
307 Integer key = ctx.channel().hashCode();
308 PerChannel perChannel = channelQueues.get(key);
309 if (perChannel != null) {
310 perChannel.lastReadTimestamp = now;
311 }
312 }
313
314 private static final class ToSend {
315 final long relativeTimeAction;
316 final Object toSend;
317 final long size;
318 final ChannelPromise promise;
319
320 private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
321 relativeTimeAction = delay;
322 this.toSend = toSend;
323 this.size = size;
324 this.promise = promise;
325 }
326 }
327
328 @Override
329 void submitWrite(final ChannelHandlerContext ctx, final Object msg,
330 final long size, final long writedelay, final long now,
331 final ChannelPromise promise) {
332 Channel channel = ctx.channel();
333 Integer key = channel.hashCode();
334 PerChannel perChannel = channelQueues.get(key);
335 if (perChannel == null) {
336 // in case write occurs before handlerAdded is raised for this handler
337 // imply a synchronized only if needed
338 perChannel = getOrSetPerChannel(ctx);
339 }
340 final ToSend newToSend;
341 long delay = writedelay;
342 boolean globalSizeExceeded = false;
343 // write operations need synchronization
344 synchronized (perChannel) {
345 if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
346 trafficCounter.bytesRealWriteFlowControl(size);
347 ctx.write(msg, promise);
348 perChannel.lastWriteTimestamp = now;
349 return;
350 }
351 if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
352 delay = maxTime;
353 }
354 newToSend = new ToSend(delay + now, msg, size, promise);
355 perChannel.messagesQueue.addLast(newToSend);
356 perChannel.queueSize += size;
357 queuesSize.addAndGet(size);
358 checkWriteSuspend(ctx, delay, perChannel.queueSize);
359 if (queuesSize.get() > maxGlobalWriteSize) {
360 globalSizeExceeded = true;
361 }
362 }
363 if (globalSizeExceeded) {
364 setUserDefinedWritability(ctx, false);
365 }
366 final long futureNow = newToSend.relativeTimeAction;
367 final PerChannel forSchedule = perChannel;
368 ctx.executor().schedule(new Runnable() {
369 @Override
370 public void run() {
371 sendAllValid(ctx, forSchedule, futureNow);
372 }
373 }, delay, TimeUnit.MILLISECONDS);
374 }
375
376 private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
377 // write operations need synchronization
378 synchronized (perChannel) {
379 ToSend newToSend = perChannel.messagesQueue.pollFirst();
380 for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
381 if (newToSend.relativeTimeAction <= now) {
382 long size = newToSend.size;
383 trafficCounter.bytesRealWriteFlowControl(size);
384 perChannel.queueSize -= size;
385 queuesSize.addAndGet(-size);
386 ctx.write(newToSend.toSend, newToSend.promise);
387 perChannel.lastWriteTimestamp = now;
388 } else {
389 perChannel.messagesQueue.addFirst(newToSend);
390 break;
391 }
392 }
393 if (perChannel.messagesQueue.isEmpty()) {
394 releaseWriteSuspended(ctx);
395 }
396 }
397 ctx.flush();
398 }
399 }