1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty5.handler.traffic;
17
18 import io.netty5.channel.Channel;
19 import io.netty5.channel.ChannelHandlerContext;
20 import io.netty5.util.Resource;
21 import io.netty5.util.concurrent.EventExecutor;
22 import io.netty5.util.concurrent.EventExecutorGroup;
23 import io.netty5.util.concurrent.Promise;
24
25 import java.util.ArrayDeque;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import static java.util.Objects.requireNonNull;
32
33 /**
34 * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global
35 * traffic shaping, that is to say a global limitation of the bandwidth, whatever
36 * the number of opened channels.</p>
37 * <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>2</b>.</p>
38 *
39 * <p>The general use should be as follow:</p>
40 * <ul>
41 * <li><p>Create your unique GlobalTrafficShapingHandler like:</p>
42 * <p><tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);</tt></p>
43 * <p>The executor could be the underlying IO worker pool</p>
44 * <p><tt>pipeline.addLast(myHandler);</tt></p>
45 *
46 * <p><b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
47 * and shared among all channels as the counter must be shared among all channels.</b></p>
48 *
49 * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
50 * or the check interval (in millisecond) that represents the delay between two computations of the
51 * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
52 *
53 * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
54 * it is recommended to set a positive value, even if it is high since the precision of the
55 * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
56 * the less precise the traffic shaping will be. It is suggested as higher value something close
57 * to 5 or 10 minutes.</p>
58 *
59 * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
60 * </li>
61 * <li>In your handler, you should consider to use the {@code channel.isWritable()} and
62 * {@code channelWritabilityChanged(ctx)} to handle writability, or through
63 * {@code future.addListener(future -> ...)} on the future returned by
64 * {@code ctx.write()}.</li>
65 * <li><p>You shall also consider to have object size in read or write operations relatively adapted to
66 * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
67 * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
68 * <li><p>Some configuration methods will be taken as best effort, meaning
69 * that all already scheduled traffics will not be
70 * changed, but only applied to new traffics.</p>
71 * So the expected usage of those methods are to be used not too often,
72 * accordingly to the traffic shaping configuration.</li>
73 * </ul>
74 *
75 * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources.
76 * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own.
77 */
78 public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
79 /**
80 * All queues per channel
81 */
82 private final ConcurrentMap<Integer, PerChannel> channelQueues = new ConcurrentHashMap<>();
83
84 /**
85 * Global queues size
86 */
87 private final AtomicLong queuesSize = new AtomicLong();
88
89 /**
90 * Max size in the list before proposing to stop writing new objects from next handlers
91 * for all channel (global)
92 */
93 long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
94
95 private static final class PerChannel {
96 ArrayDeque<ToSend> messagesQueue;
97 long queueSize;
98 long lastWriteTimestamp;
99 long lastReadTimestamp;
100 }
101
102 /**
103 * Create the global TrafficCounter.
104 */
105 void createGlobalTrafficCounter(EventExecutorGroup executor) {
106 requireNonNull(executor, "executor");
107 TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC", checkInterval);
108 setTrafficCounter(tc);
109 tc.start();
110 }
111
112 @Override
113 public boolean isSharable() {
114 return true;
115 }
116
117 @Override
118 protected int userDefinedWritabilityIndex() {
119 return AbstractTrafficShapingHandler.GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
120 }
121
122 /**
123 * Create a new instance.
124 *
125 * @param executor
126 * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}.
127 * @param writeLimit
128 * 0 or a limit in bytes/s
129 * @param readLimit
130 * 0 or a limit in bytes/s
131 * @param checkInterval
132 * The delay between two computations of performances for
133 * channels or 0 if no stats are to be computed.
134 * @param maxTime
135 * The maximum delay to wait in case of traffic excess.
136 */
137 public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit, long readLimit,
138 long checkInterval, long maxTime) {
139 super(writeLimit, readLimit, checkInterval, maxTime);
140 createGlobalTrafficCounter(executor);
141 }
142
143 /**
144 * Create a new instance using
145 * default max time as delay allowed value of 15000 ms.
146 *
147 * @param executor
148 * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}.
149 * @param writeLimit
150 * 0 or a limit in bytes/s
151 * @param readLimit
152 * 0 or a limit in bytes/s
153 * @param checkInterval
154 * The delay between two computations of performances for
155 * channels or 0 if no stats are to be computed.
156 */
157 public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit,
158 long readLimit, long checkInterval) {
159 super(writeLimit, readLimit, checkInterval);
160 createGlobalTrafficCounter(executor);
161 }
162
163 /**
164 * Create a new instance using default Check Interval value of 1000 ms and
165 * default max time as delay allowed value of 15000 ms.
166 *
167 * @param executor
168 * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}.
169 * @param writeLimit
170 * 0 or a limit in bytes/s
171 * @param readLimit
172 * 0 or a limit in bytes/s
173 */
174 public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit,
175 long readLimit) {
176 super(writeLimit, readLimit);
177 createGlobalTrafficCounter(executor);
178 }
179
180 /**
181 * Create a new instance using
182 * default max time as delay allowed value of 15000 ms and no limit.
183 *
184 * @param executor
185 * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}.
186 * @param checkInterval
187 * The delay between two computations of performances for
188 * channels or 0 if no stats are to be computed.
189 */
190 public GlobalTrafficShapingHandler(EventExecutorGroup executor, long checkInterval) {
191 super(checkInterval);
192 createGlobalTrafficCounter(executor);
193 }
194
195 /**
196 * Create a new instance using default Check Interval value of 1000 ms and
197 * default max time as delay allowed value of 15000 ms and no limit.
198 *
199 * @param executor
200 * the {@link EventExecutor} to use for the {@link TrafficCounter}.
201 */
202 public GlobalTrafficShapingHandler(EventExecutor executor) {
203 createGlobalTrafficCounter(executor);
204 }
205
206 /**
207 * @return the maxGlobalWriteSize default value being 400 MB.
208 */
209 public long getMaxGlobalWriteSize() {
210 return maxGlobalWriteSize;
211 }
212
213 /**
214 * Note the change will be taken as best effort, meaning
215 * that all already scheduled traffics will not be
216 * changed, but only applied to new traffics.<br>
217 * So the expected usage of this method is to be used not too often,
218 * accordingly to the traffic shaping configuration.
219 *
220 * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
221 * globally for all channels before write suspended is set,
222 * default value being 400 MB.
223 */
224 public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
225 this.maxGlobalWriteSize = maxGlobalWriteSize;
226 }
227
228 /**
229 * @return the global size of the buffers for all queues.
230 */
231 public long queuesSize() {
232 return queuesSize.get();
233 }
234
235 /**
236 * Release all internal resources of this instance.
237 */
238 public final void release() {
239 trafficCounter.stop();
240 }
241
242 private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
243 // ensure creation is limited to one thread per channel
244 Channel channel = ctx.channel();
245 Integer key = channel.hashCode();
246 PerChannel perChannel = channelQueues.get(key);
247 if (perChannel == null) {
248 perChannel = new PerChannel();
249 perChannel.messagesQueue = new ArrayDeque<>();
250 perChannel.queueSize = 0L;
251 perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
252 perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
253 channelQueues.put(key, perChannel);
254 }
255 return perChannel;
256 }
257
258 @Override
259 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
260 getOrSetPerChannel(ctx);
261 super.handlerAdded(ctx);
262 }
263
264 @Override
265 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
266 Channel channel = ctx.channel();
267 Integer key = channel.hashCode();
268 PerChannel perChannel = channelQueues.remove(key);
269 if (perChannel != null) {
270 // write operations need synchronization
271 synchronized (perChannel) {
272 if (channel.isActive()) {
273 for (ToSend toSend : perChannel.messagesQueue) {
274 long size = calculateSize(toSend.toSend);
275 trafficCounter.bytesRealWriteFlowControl(size);
276 perChannel.queueSize -= size;
277 queuesSize.addAndGet(-size);
278 ctx.write(toSend.toSend).cascadeTo(toSend.promise);
279 }
280 } else {
281 queuesSize.addAndGet(-perChannel.queueSize);
282 for (ToSend toSend : perChannel.messagesQueue) {
283 if (Resource.isAccessible(toSend.toSend, false)) {
284 Resource.dispose(toSend.toSend);
285 }
286 }
287 }
288 perChannel.messagesQueue.clear();
289 }
290 }
291 releaseWriteSuspended(ctx);
292 releaseReadSuspended(ctx);
293 super.handlerRemoved(ctx);
294 }
295
296 @Override
297 long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
298 Integer key = ctx.channel().hashCode();
299 PerChannel perChannel = channelQueues.get(key);
300 if (perChannel != null) {
301 if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
302 wait = maxTime;
303 }
304 }
305 return wait;
306 }
307
308 @Override
309 void informReadOperation(final ChannelHandlerContext ctx, final long now) {
310 Integer key = ctx.channel().hashCode();
311 PerChannel perChannel = channelQueues.get(key);
312 if (perChannel != null) {
313 perChannel.lastReadTimestamp = now;
314 }
315 }
316
317 private static final class ToSend {
318 final long relativeTimeAction;
319 final Object toSend;
320 final long size;
321 final Promise<Void> promise;
322
323 private ToSend(final long delay, final Object toSend, final long size, final Promise<Void> promise) {
324 relativeTimeAction = delay;
325 this.toSend = toSend;
326 this.size = size;
327 this.promise = promise;
328 }
329 }
330
331 @Override
332 void submitWrite(final ChannelHandlerContext ctx, final Object msg,
333 final long size, final long writedelay, final long now,
334 final Promise<Void> promise) {
335 Channel channel = ctx.channel();
336 Integer key = channel.hashCode();
337 PerChannel perChannel = channelQueues.get(key);
338 if (perChannel == null) {
339 // in case write occurs before handlerAdded is raised for this handler
340 // imply a synchronized only if needed
341 perChannel = getOrSetPerChannel(ctx);
342 }
343 final ToSend newToSend;
344 long delay = writedelay;
345 boolean globalSizeExceeded = false;
346 // write operations need synchronization
347 synchronized (perChannel) {
348 if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
349 trafficCounter.bytesRealWriteFlowControl(size);
350 ctx.write(msg).cascadeTo(promise);
351 perChannel.lastWriteTimestamp = now;
352 return;
353 }
354 if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
355 delay = maxTime;
356 }
357 newToSend = new ToSend(delay + now, msg, size, promise);
358 perChannel.messagesQueue.addLast(newToSend);
359 perChannel.queueSize += size;
360 queuesSize.addAndGet(size);
361 checkWriteSuspend(ctx, delay, perChannel.queueSize);
362 if (queuesSize.get() > maxGlobalWriteSize) {
363 globalSizeExceeded = true;
364 }
365 }
366 if (globalSizeExceeded) {
367 setUserDefinedWritability(ctx, false);
368 }
369 final long futureNow = newToSend.relativeTimeAction;
370 final PerChannel forSchedule = perChannel;
371 ctx.executor().schedule(() -> sendAllValid(ctx, forSchedule, futureNow), delay, TimeUnit.MILLISECONDS);
372 }
373
374 private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
375 // write operations need synchronization
376 synchronized (perChannel) {
377 ToSend newToSend = perChannel.messagesQueue.pollFirst();
378 for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
379 if (newToSend.relativeTimeAction <= now) {
380 long size = newToSend.size;
381 trafficCounter.bytesRealWriteFlowControl(size);
382 perChannel.queueSize -= size;
383 queuesSize.addAndGet(-size);
384 ctx.write(newToSend.toSend).cascadeTo(newToSend.promise);
385 perChannel.lastWriteTimestamp = now;
386 } else {
387 perChannel.messagesQueue.addFirst(newToSend);
388 break;
389 }
390 }
391 if (perChannel.messagesQueue.isEmpty()) {
392 releaseWriteSuspended(ctx);
393 }
394 }
395 ctx.flush();
396 }
397 }