View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.traffic;
17  
18  import io.netty5.channel.Channel;
19  import io.netty5.channel.ChannelHandlerContext;
20  import io.netty5.util.Resource;
21  import io.netty5.util.concurrent.EventExecutor;
22  import io.netty5.util.concurrent.EventExecutorGroup;
23  import io.netty5.util.concurrent.Promise;
24  
25  import java.util.ArrayDeque;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.ConcurrentMap;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicLong;
30  
31  import static java.util.Objects.requireNonNull;
32  
33  /**
34   * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global
35   * traffic shaping, that is to say a global limitation of the bandwidth, whatever
36   * the number of opened channels.</p>
37   * <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>2</b>.</p>
38   *
39   * <p>The general use should be as follow:</p>
40   * <ul>
41   * <li><p>Create your unique GlobalTrafficShapingHandler like:</p>
42   * <p><tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);</tt></p>
43   * <p>The executor could be the underlying IO worker pool</p>
44   * <p><tt>pipeline.addLast(myHandler);</tt></p>
45   *
46   * <p><b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
47   * and shared among all channels as the counter must be shared among all channels.</b></p>
48   *
49   * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
50   * or the check interval (in millisecond) that represents the delay between two computations of the
51   * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
52   *
53   * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
54   * it is recommended to set a positive value, even if it is high since the precision of the
55   * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
56   * the less precise the traffic shaping will be. It is suggested as higher value something close
57   * to 5 or 10 minutes.</p>
58   *
59   * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
60   * </li>
61   * <li>In your handler, you should consider to use the {@code channel.isWritable()} and
62   * {@code channelWritabilityChanged(ctx)} to handle writability, or through
63   * {@code future.addListener(future -> ...)} on the future returned by
64   * {@code ctx.write()}.</li>
65   * <li><p>You shall also consider to have object size in read or write operations relatively adapted to
66   * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
67   * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
68   * <li><p>Some configuration methods will be taken as best effort, meaning
69   * that all already scheduled traffics will not be
70   * changed, but only applied to new traffics.</p>
71   * So the expected usage of those methods are to be used not too often,
72   * accordingly to the traffic shaping configuration.</li>
73   * </ul>
74   *
75   * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources.
76   * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own.
77   */
78  public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
79      /**
80       * All queues per channel
81       */
82      private final ConcurrentMap<Integer, PerChannel> channelQueues = new ConcurrentHashMap<>();
83  
84      /**
85       * Global queues size
86       */
87      private final AtomicLong queuesSize = new AtomicLong();
88  
89      /**
90       * Max size in the list before proposing to stop writing new objects from next handlers
91       * for all channel (global)
92       */
93      long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
94  
95      private static final class PerChannel {
96          ArrayDeque<ToSend> messagesQueue;
97          long queueSize;
98          long lastWriteTimestamp;
99          long lastReadTimestamp;
100     }
101 
102     /**
103      * Create the global TrafficCounter.
104      */
105     void createGlobalTrafficCounter(EventExecutorGroup executor) {
106         requireNonNull(executor, "executor");
107         TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC", checkInterval);
108         setTrafficCounter(tc);
109         tc.start();
110     }
111 
112     @Override
113     public boolean isSharable() {
114         return true;
115     }
116 
117     @Override
118     protected int userDefinedWritabilityIndex() {
119         return AbstractTrafficShapingHandler.GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
120     }
121 
122     /**
123      * Create a new instance.
124      *
125      * @param executor
126      *            the {@link EventExecutorGroup} to use for the {@link TrafficCounter}.
127      * @param writeLimit
128      *            0 or a limit in bytes/s
129      * @param readLimit
130      *            0 or a limit in bytes/s
131      * @param checkInterval
132      *            The delay between two computations of performances for
133      *            channels or 0 if no stats are to be computed.
134      * @param maxTime
135      *            The maximum delay to wait in case of traffic excess.
136      */
137     public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit, long readLimit,
138             long checkInterval, long maxTime) {
139         super(writeLimit, readLimit, checkInterval, maxTime);
140         createGlobalTrafficCounter(executor);
141     }
142 
143     /**
144      * Create a new instance using
145      * default max time as delay allowed value of 15000 ms.
146      *
147      * @param executor
148      *          the {@link EventExecutorGroup} to use for the {@link TrafficCounter}.
149      * @param writeLimit
150      *          0 or a limit in bytes/s
151      * @param readLimit
152      *          0 or a limit in bytes/s
153      * @param checkInterval
154      *          The delay between two computations of performances for
155      *            channels or 0 if no stats are to be computed.
156      */
157     public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit,
158                                        long readLimit, long checkInterval) {
159         super(writeLimit, readLimit, checkInterval);
160         createGlobalTrafficCounter(executor);
161     }
162 
163     /**
164      * Create a new instance using default Check Interval value of 1000 ms and
165      * default max time as delay allowed value of 15000 ms.
166      *
167      * @param executor
168      *          the {@link EventExecutorGroup} to use for the {@link TrafficCounter}.
169      * @param writeLimit
170      *          0 or a limit in bytes/s
171      * @param readLimit
172      *          0 or a limit in bytes/s
173      */
174     public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit,
175             long readLimit) {
176         super(writeLimit, readLimit);
177         createGlobalTrafficCounter(executor);
178     }
179 
180     /**
181      * Create a new instance using
182      * default max time as delay allowed value of 15000 ms and no limit.
183      *
184      * @param executor
185      *          the {@link EventExecutorGroup} to use for the {@link TrafficCounter}.
186      * @param checkInterval
187      *          The delay between two computations of performances for
188      *            channels or 0 if no stats are to be computed.
189      */
190     public GlobalTrafficShapingHandler(EventExecutorGroup executor, long checkInterval) {
191         super(checkInterval);
192         createGlobalTrafficCounter(executor);
193     }
194 
195     /**
196      * Create a new instance using default Check Interval value of 1000 ms and
197      * default max time as delay allowed value of 15000 ms and no limit.
198      *
199      * @param executor
200      *          the {@link EventExecutor} to use for the {@link TrafficCounter}.
201      */
202     public GlobalTrafficShapingHandler(EventExecutor executor) {
203         createGlobalTrafficCounter(executor);
204     }
205 
206     /**
207      * @return the maxGlobalWriteSize default value being 400 MB.
208      */
209     public long getMaxGlobalWriteSize() {
210         return maxGlobalWriteSize;
211     }
212 
213     /**
214      * Note the change will be taken as best effort, meaning
215      * that all already scheduled traffics will not be
216      * changed, but only applied to new traffics.<br>
217      * So the expected usage of this method is to be used not too often,
218      * accordingly to the traffic shaping configuration.
219      *
220      * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
221      *            globally for all channels before write suspended is set,
222      *            default value being 400 MB.
223      */
224     public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
225         this.maxGlobalWriteSize = maxGlobalWriteSize;
226     }
227 
228     /**
229      * @return the global size of the buffers for all queues.
230      */
231     public long queuesSize() {
232         return queuesSize.get();
233     }
234 
235     /**
236      * Release all internal resources of this instance.
237      */
238     public final void release() {
239         trafficCounter.stop();
240     }
241 
242     private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
243         // ensure creation is limited to one thread per channel
244         Channel channel = ctx.channel();
245         Integer key = channel.hashCode();
246         PerChannel perChannel = channelQueues.get(key);
247         if (perChannel == null) {
248             perChannel = new PerChannel();
249             perChannel.messagesQueue = new ArrayDeque<>();
250             perChannel.queueSize = 0L;
251             perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
252             perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
253             channelQueues.put(key, perChannel);
254         }
255         return perChannel;
256     }
257 
258     @Override
259     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
260         getOrSetPerChannel(ctx);
261         super.handlerAdded(ctx);
262     }
263 
264     @Override
265     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
266         Channel channel = ctx.channel();
267         Integer key = channel.hashCode();
268         PerChannel perChannel = channelQueues.remove(key);
269         if (perChannel != null) {
270             // write operations need synchronization
271             synchronized (perChannel) {
272                 if (channel.isActive()) {
273                     for (ToSend toSend : perChannel.messagesQueue) {
274                         long size = calculateSize(toSend.toSend);
275                         trafficCounter.bytesRealWriteFlowControl(size);
276                         perChannel.queueSize -= size;
277                         queuesSize.addAndGet(-size);
278                         ctx.write(toSend.toSend).cascadeTo(toSend.promise);
279                     }
280                 } else {
281                     queuesSize.addAndGet(-perChannel.queueSize);
282                     for (ToSend toSend : perChannel.messagesQueue) {
283                         if (Resource.isAccessible(toSend.toSend, false)) {
284                             Resource.dispose(toSend.toSend);
285                         }
286                     }
287                 }
288                 perChannel.messagesQueue.clear();
289             }
290         }
291         releaseWriteSuspended(ctx);
292         releaseReadSuspended(ctx);
293         super.handlerRemoved(ctx);
294     }
295 
296     @Override
297     long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
298         Integer key = ctx.channel().hashCode();
299         PerChannel perChannel = channelQueues.get(key);
300         if (perChannel != null) {
301             if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
302                 wait = maxTime;
303             }
304         }
305         return wait;
306     }
307 
308     @Override
309     void informReadOperation(final ChannelHandlerContext ctx, final long now) {
310         Integer key = ctx.channel().hashCode();
311         PerChannel perChannel = channelQueues.get(key);
312         if (perChannel != null) {
313             perChannel.lastReadTimestamp = now;
314         }
315     }
316 
317     private static final class ToSend {
318         final long relativeTimeAction;
319         final Object toSend;
320         final long size;
321         final Promise<Void> promise;
322 
323         private ToSend(final long delay, final Object toSend, final long size, final Promise<Void> promise) {
324             relativeTimeAction = delay;
325             this.toSend = toSend;
326             this.size = size;
327             this.promise = promise;
328         }
329     }
330 
331     @Override
332     void submitWrite(final ChannelHandlerContext ctx, final Object msg,
333             final long size, final long writedelay, final long now,
334             final Promise<Void> promise) {
335         Channel channel = ctx.channel();
336         Integer key = channel.hashCode();
337         PerChannel perChannel = channelQueues.get(key);
338         if (perChannel == null) {
339             // in case write occurs before handlerAdded is raised for this handler
340             // imply a synchronized only if needed
341             perChannel = getOrSetPerChannel(ctx);
342         }
343         final ToSend newToSend;
344         long delay = writedelay;
345         boolean globalSizeExceeded = false;
346         // write operations need synchronization
347         synchronized (perChannel) {
348             if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
349                 trafficCounter.bytesRealWriteFlowControl(size);
350                 ctx.write(msg).cascadeTo(promise);
351                 perChannel.lastWriteTimestamp = now;
352                 return;
353             }
354             if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
355                 delay = maxTime;
356             }
357             newToSend = new ToSend(delay + now, msg, size, promise);
358             perChannel.messagesQueue.addLast(newToSend);
359             perChannel.queueSize += size;
360             queuesSize.addAndGet(size);
361             checkWriteSuspend(ctx, delay, perChannel.queueSize);
362             if (queuesSize.get() > maxGlobalWriteSize) {
363                 globalSizeExceeded = true;
364             }
365         }
366         if (globalSizeExceeded) {
367             setUserDefinedWritability(ctx, false);
368         }
369         final long futureNow = newToSend.relativeTimeAction;
370         final PerChannel forSchedule = perChannel;
371         ctx.executor().schedule(() -> sendAllValid(ctx, forSchedule, futureNow), delay, TimeUnit.MILLISECONDS);
372     }
373 
374     private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
375         // write operations need synchronization
376         synchronized (perChannel) {
377             ToSend newToSend = perChannel.messagesQueue.pollFirst();
378             for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
379                 if (newToSend.relativeTimeAction <= now) {
380                     long size = newToSend.size;
381                     trafficCounter.bytesRealWriteFlowControl(size);
382                     perChannel.queueSize -= size;
383                     queuesSize.addAndGet(-size);
384                     ctx.write(newToSend.toSend).cascadeTo(newToSend.promise);
385                     perChannel.lastWriteTimestamp = now;
386                 } else {
387                     perChannel.messagesQueue.addFirst(newToSend);
388                     break;
389                 }
390             }
391             if (perChannel.messagesQueue.isEmpty()) {
392                 releaseWriteSuspended(ctx);
393             }
394         }
395         ctx.flush();
396     }
397 }