1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.traffic;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelHandler.Sharable;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelPromise;
23 import io.netty.util.concurrent.EventExecutor;
24 import io.netty.util.internal.ObjectUtil;
25
26 import java.util.ArrayDeque;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentMap;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 /**
34 * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global
35 * traffic shaping, that is to say a global limitation of the bandwidth, whatever
36 * the number of opened channels.</p>
37 * <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>2</b>.</p>
38 *
39 * <p>The general use should be as follow:</p>
40 * <ul>
41 * <li><p>Create your unique GlobalTrafficShapingHandler like:</p>
42 * <p><tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);</tt></p>
43 * <p>The executor could be the underlying IO worker pool</p>
44 * <p><tt>pipeline.addLast(myHandler);</tt></p>
45 *
46 * <p><b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
47 * and shared among all channels as the counter must be shared among all channels.</b></p>
48 *
49 * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
50 * or the check interval (in millisecond) that represents the delay between two computations of the
51 * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
52 *
53 * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
54 * it is recommended to set a positive value, even if it is high since the precision of the
55 * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
56 * the less precise the traffic shaping will be. It is suggested as higher value something close
57 * to 5 or 10 minutes.</p>
58 *
59 * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
60 * </li>
61 * <li>In your handler, you should consider to use the {@code channel.isWritable()} and
62 * {@code channelWritabilityChanged(ctx)} to handle writability, or through
63 * {@code future.addListener(new GenericFutureListener())} on the future returned by
64 * {@code ctx.write()}.</li>
65 * <li><p>You shall also consider to have object size in read or write operations relatively adapted to
66 * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
67 * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
68 * <li><p>Some configuration methods will be taken as best effort, meaning
69 * that all already scheduled traffics will not be
70 * changed, but only applied to new traffics.</p>
71 * So the expected usage of those methods are to be used not too often,
72 * accordingly to the traffic shaping configuration.</li>
73 * </ul>
74 *
75 * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources.
76 * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own.
77 */
78 @Sharable
79 public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
80 /**
81 * All queues per channel
82 */
83 private final ConcurrentMap<Integer, PerChannel> channelQueues = new ConcurrentHashMap<>();
84
85 /**
86 * Global queues size
87 */
88 private final AtomicLong queuesSize = new AtomicLong();
89
90 /**
91 * Max size in the list before proposing to stop writing new objects from next handlers
92 * for all channel (global)
93 */
94 long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
95
96 private static final class PerChannel {
97 ArrayDeque<ToSend> messagesQueue;
98 long queueSize;
99 long lastWriteTimestamp;
100 long lastReadTimestamp;
101 }
102
103 /**
104 * Create the global TrafficCounter.
105 */
106 void createGlobalTrafficCounter(ScheduledExecutorService executor) {
107 TrafficCounter tc = new TrafficCounter(this,
108 ObjectUtil.checkNotNull(executor, "executor"),
109 "GlobalTC",
110 checkInterval);
111
112 setTrafficCounter(tc);
113 tc.start();
114 }
115
116 @Override
117 protected int userDefinedWritabilityIndex() {
118 return AbstractTrafficShapingHandler.GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
119 }
120
121 /**
122 * Create a new instance.
123 *
124 * @param executor
125 * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
126 * @param writeLimit
127 * 0 or a limit in bytes/s
128 * @param readLimit
129 * 0 or a limit in bytes/s
130 * @param checkInterval
131 * The delay between two computations of performances for
132 * channels or 0 if no stats are to be computed.
133 * @param maxTime
134 * The maximum delay to wait in case of traffic excess.
135 */
136 public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit,
137 long checkInterval, long maxTime) {
138 super(writeLimit, readLimit, checkInterval, maxTime);
139 createGlobalTrafficCounter(executor);
140 }
141
142 /**
143 * Create a new instance using
144 * default max time as delay allowed value of 15000 ms.
145 *
146 * @param executor
147 * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
148 * @param writeLimit
149 * 0 or a limit in bytes/s
150 * @param readLimit
151 * 0 or a limit in bytes/s
152 * @param checkInterval
153 * The delay between two computations of performances for
154 * channels or 0 if no stats are to be computed.
155 */
156 public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit,
157 long readLimit, long checkInterval) {
158 super(writeLimit, readLimit, checkInterval);
159 createGlobalTrafficCounter(executor);
160 }
161
162 /**
163 * Create a new instance using default Check Interval value of 1000 ms and
164 * default max time as delay allowed value of 15000 ms.
165 *
166 * @param executor
167 * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
168 * @param writeLimit
169 * 0 or a limit in bytes/s
170 * @param readLimit
171 * 0 or a limit in bytes/s
172 */
173 public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit,
174 long readLimit) {
175 super(writeLimit, readLimit);
176 createGlobalTrafficCounter(executor);
177 }
178
179 /**
180 * Create a new instance using
181 * default max time as delay allowed value of 15000 ms and no limit.
182 *
183 * @param executor
184 * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
185 * @param checkInterval
186 * The delay between two computations of performances for
187 * channels or 0 if no stats are to be computed.
188 */
189 public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) {
190 super(checkInterval);
191 createGlobalTrafficCounter(executor);
192 }
193
194 /**
195 * Create a new instance using default Check Interval value of 1000 ms and
196 * default max time as delay allowed value of 15000 ms and no limit.
197 *
198 * @param executor
199 * the {@link EventExecutor} to use for the {@link TrafficCounter}.
200 */
201 public GlobalTrafficShapingHandler(EventExecutor executor) {
202 createGlobalTrafficCounter(executor);
203 }
204
205 /**
206 * @return the maxGlobalWriteSize default value being 400 MB.
207 */
208 public long getMaxGlobalWriteSize() {
209 return maxGlobalWriteSize;
210 }
211
212 /**
213 * Note the change will be taken as best effort, meaning
214 * that all already scheduled traffics will not be
215 * changed, but only applied to new traffics.<br>
216 * So the expected usage of this method is to be used not too often,
217 * accordingly to the traffic shaping configuration.
218 *
219 * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
220 * globally for all channels before write suspended is set,
221 * default value being 400 MB.
222 */
223 public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
224 this.maxGlobalWriteSize = maxGlobalWriteSize;
225 }
226
227 /**
228 * @return the global size of the buffers for all queues.
229 */
230 public long queuesSize() {
231 return queuesSize.get();
232 }
233
234 /**
235 * Release all internal resources of this instance.
236 */
237 public final void release() {
238 trafficCounter.stop();
239 }
240
241 private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
242 // ensure creation is limited to one thread per channel
243 Channel channel = ctx.channel();
244 Integer key = channel.hashCode();
245 PerChannel perChannel = channelQueues.get(key);
246 if (perChannel == null) {
247 perChannel = new PerChannel();
248 perChannel.messagesQueue = new ArrayDeque<ToSend>();
249 perChannel.queueSize = 0L;
250 perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
251 perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
252 channelQueues.put(key, perChannel);
253 }
254 return perChannel;
255 }
256
257 @Override
258 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
259 getOrSetPerChannel(ctx);
260 super.handlerAdded(ctx);
261 }
262
263 @Override
264 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
265 Channel channel = ctx.channel();
266 Integer key = channel.hashCode();
267 PerChannel perChannel = channelQueues.remove(key);
268 if (perChannel != null) {
269 // write operations need synchronization
270 synchronized (perChannel) {
271 if (channel.isActive()) {
272 for (ToSend toSend : perChannel.messagesQueue) {
273 long size = calculateSize(toSend.toSend);
274 trafficCounter.bytesRealWriteFlowControl(size);
275 perChannel.queueSize -= size;
276 queuesSize.addAndGet(-size);
277 ctx.write(toSend.toSend, toSend.promise);
278 }
279 } else {
280 queuesSize.addAndGet(-perChannel.queueSize);
281 for (ToSend toSend : perChannel.messagesQueue) {
282 if (toSend.toSend instanceof ByteBuf) {
283 ((ByteBuf) toSend.toSend).release();
284 }
285 }
286 }
287 perChannel.messagesQueue.clear();
288 }
289 }
290 releaseWriteSuspended(ctx);
291 releaseReadSuspended(ctx);
292 super.handlerRemoved(ctx);
293 }
294
295 @Override
296 long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
297 Integer key = ctx.channel().hashCode();
298 PerChannel perChannel = channelQueues.get(key);
299 if (perChannel != null) {
300 if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
301 wait = maxTime;
302 }
303 }
304 return wait;
305 }
306
307 @Override
308 void informReadOperation(final ChannelHandlerContext ctx, final long now) {
309 Integer key = ctx.channel().hashCode();
310 PerChannel perChannel = channelQueues.get(key);
311 if (perChannel != null) {
312 perChannel.lastReadTimestamp = now;
313 }
314 }
315
316 private static final class ToSend {
317 final long relativeTimeAction;
318 final Object toSend;
319 final long size;
320 final ChannelPromise promise;
321
322 private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
323 relativeTimeAction = delay;
324 this.toSend = toSend;
325 this.size = size;
326 this.promise = promise;
327 }
328 }
329
330 @Override
331 void submitWrite(final ChannelHandlerContext ctx, final Object msg,
332 final long size, final long writedelay, final long now,
333 final ChannelPromise promise) {
334 Channel channel = ctx.channel();
335 Integer key = channel.hashCode();
336 PerChannel perChannel = channelQueues.get(key);
337 if (perChannel == null) {
338 // in case write occurs before handlerAdded is raised for this handler
339 // imply a synchronized only if needed
340 perChannel = getOrSetPerChannel(ctx);
341 }
342 final ToSend newToSend;
343 long delay = writedelay;
344 boolean globalSizeExceeded = false;
345 // write operations need synchronization
346 synchronized (perChannel) {
347 if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
348 trafficCounter.bytesRealWriteFlowControl(size);
349 ctx.write(msg, promise);
350 perChannel.lastWriteTimestamp = now;
351 return;
352 }
353 if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
354 delay = maxTime;
355 }
356 newToSend = new ToSend(delay + now, msg, size, promise);
357 perChannel.messagesQueue.addLast(newToSend);
358 perChannel.queueSize += size;
359 queuesSize.addAndGet(size);
360 checkWriteSuspend(ctx, delay, perChannel.queueSize);
361 if (queuesSize.get() > maxGlobalWriteSize) {
362 globalSizeExceeded = true;
363 }
364 }
365 if (globalSizeExceeded) {
366 setUserDefinedWritability(ctx, false);
367 }
368 final long futureNow = newToSend.relativeTimeAction;
369 final PerChannel forSchedule = perChannel;
370 ctx.executor().schedule(new Runnable() {
371 @Override
372 public void run() {
373 sendAllValid(ctx, forSchedule, futureNow);
374 }
375 }, delay, TimeUnit.MILLISECONDS);
376 }
377
378 private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
379 // write operations need synchronization
380 synchronized (perChannel) {
381 ToSend newToSend = perChannel.messagesQueue.pollFirst();
382 for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
383 if (newToSend.relativeTimeAction <= now) {
384 long size = newToSend.size;
385 trafficCounter.bytesRealWriteFlowControl(size);
386 perChannel.queueSize -= size;
387 queuesSize.addAndGet(-size);
388 ctx.write(newToSend.toSend, newToSend.promise);
389 perChannel.lastWriteTimestamp = now;
390 } else {
391 perChannel.messagesQueue.addFirst(newToSend);
392 break;
393 }
394 }
395 if (perChannel.messagesQueue.isEmpty()) {
396 releaseWriteSuspended(ctx);
397 }
398 }
399 ctx.flush();
400 }
401 }