View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.traffic;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandler.Sharable;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.util.concurrent.EventExecutor;
24  import io.netty.util.internal.ObjectUtil;
25  import io.netty.util.internal.PlatformDependent;
26  
27  import java.util.ArrayDeque;
28  import java.util.concurrent.ConcurrentMap;
29  import java.util.concurrent.ScheduledExecutorService;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  /**
34   * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global
35   * traffic shaping, that is to say a global limitation of the bandwidth, whatever
36   * the number of opened channels.</p>
37   * <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>2</b>.</p>
38   *
39   * <p>The general use should be as follow:</p>
40   * <ul>
41   * <li><p>Create your unique GlobalTrafficShapingHandler like:</p>
42   * <p><tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);</tt></p>
43   * <p>The executor could be the underlying IO worker pool</p>
44   * <p><tt>pipeline.addLast(myHandler);</tt></p>
45   *
46   * <p><b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
47   * and shared among all channels as the counter must be shared among all channels.</b></p>
48   *
49   * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
50   * or the check interval (in millisecond) that represents the delay between two computations of the
51   * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
52   *
53   * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
54   * it is recommended to set a positive value, even if it is high since the precision of the
55   * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
56   * the less precise the traffic shaping will be. It is suggested as higher value something close
57   * to 5 or 10 minutes.</p>
58   *
59   * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
60   * </li>
61   * <li>In your handler, you should consider to use the {@code channel.isWritable()} and
62   * {@code channelWritabilityChanged(ctx)} to handle writability, or through
63   * {@code future.addListener(new GenericFutureListener())} on the future returned by
64   * {@code ctx.write()}.</li>
65   * <li><p>You shall also consider to have object size in read or write operations relatively adapted to
66   * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
67   * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
68   * <li><p>Some configuration methods will be taken as best effort, meaning
69   * that all already scheduled traffics will not be
70   * changed, but only applied to new traffics.</p>
71   * So the expected usage of those methods are to be used not too often,
72   * accordingly to the traffic shaping configuration.</li>
73   * </ul>
74   *
75   * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources.
76   * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own.
77   */
78  @Sharable
79  public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
80      /**
81       * All queues per channel
82       */
83      private final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
84  
85      /**
86       * Global queues size
87       */
88      private final AtomicLong queuesSize = new AtomicLong();
89  
90      /**
91       * Max size in the list before proposing to stop writing new objects from next handlers
92       * for all channel (global)
93       */
94      long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
95  
96      private static final class PerChannel {
97          ArrayDeque<ToSend> messagesQueue;
98          long queueSize;
99          long lastWriteTimestamp;
100         long lastReadTimestamp;
101     }
102 
103     /**
104      * Create the global TrafficCounter.
105      */
106     void createGlobalTrafficCounter(ScheduledExecutorService executor) {
107         TrafficCounter tc = new TrafficCounter(this,
108                 ObjectUtil.checkNotNull(executor, "executor"),
109                 "GlobalTC",
110                 checkInterval);
111 
112         setTrafficCounter(tc);
113         tc.start();
114     }
115 
116     @Override
117     protected int userDefinedWritabilityIndex() {
118         return AbstractTrafficShapingHandler.GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
119     }
120 
121     /**
122      * Create a new instance.
123      *
124      * @param executor
125      *            the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
126      * @param writeLimit
127      *            0 or a limit in bytes/s
128      * @param readLimit
129      *            0 or a limit in bytes/s
130      * @param checkInterval
131      *            The delay between two computations of performances for
132      *            channels or 0 if no stats are to be computed.
133      * @param maxTime
134      *            The maximum delay to wait in case of traffic excess.
135      */
136     public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit,
137             long checkInterval, long maxTime) {
138         super(writeLimit, readLimit, checkInterval, maxTime);
139         createGlobalTrafficCounter(executor);
140     }
141 
142     /**
143      * Create a new instance using
144      * default max time as delay allowed value of 15000 ms.
145      *
146      * @param executor
147      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
148      * @param writeLimit
149      *          0 or a limit in bytes/s
150      * @param readLimit
151      *          0 or a limit in bytes/s
152      * @param checkInterval
153      *          The delay between two computations of performances for
154      *            channels or 0 if no stats are to be computed.
155      */
156     public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit,
157             long readLimit, long checkInterval) {
158         super(writeLimit, readLimit, checkInterval);
159         createGlobalTrafficCounter(executor);
160     }
161 
162     /**
163      * Create a new instance using default Check Interval value of 1000 ms and
164      * default max time as delay allowed value of 15000 ms.
165      *
166      * @param executor
167      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
168      * @param writeLimit
169      *          0 or a limit in bytes/s
170      * @param readLimit
171      *          0 or a limit in bytes/s
172      */
173     public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit,
174             long readLimit) {
175         super(writeLimit, readLimit);
176         createGlobalTrafficCounter(executor);
177     }
178 
179     /**
180      * Create a new instance using
181      * default max time as delay allowed value of 15000 ms and no limit.
182      *
183      * @param executor
184      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
185      * @param checkInterval
186      *          The delay between two computations of performances for
187      *            channels or 0 if no stats are to be computed.
188      */
189     public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) {
190         super(checkInterval);
191         createGlobalTrafficCounter(executor);
192     }
193 
194     /**
195      * Create a new instance using default Check Interval value of 1000 ms and
196      * default max time as delay allowed value of 15000 ms and no limit.
197      *
198      * @param executor
199      *          the {@link EventExecutor} to use for the {@link TrafficCounter}.
200      */
201     public GlobalTrafficShapingHandler(EventExecutor executor) {
202         createGlobalTrafficCounter(executor);
203     }
204 
205     /**
206      * @return the maxGlobalWriteSize default value being 400 MB.
207      */
208     public long getMaxGlobalWriteSize() {
209         return maxGlobalWriteSize;
210     }
211 
212     /**
213      * Note the change will be taken as best effort, meaning
214      * that all already scheduled traffics will not be
215      * changed, but only applied to new traffics.<br>
216      * So the expected usage of this method is to be used not too often,
217      * accordingly to the traffic shaping configuration.
218      *
219      * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
220      *            globally for all channels before write suspended is set,
221      *            default value being 400 MB.
222      */
223     public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
224         this.maxGlobalWriteSize = maxGlobalWriteSize;
225     }
226 
227     /**
228      * @return the global size of the buffers for all queues.
229      */
230     public long queuesSize() {
231         return queuesSize.get();
232     }
233 
234     /**
235      * Release all internal resources of this instance.
236      */
237     public final void release() {
238         trafficCounter.stop();
239     }
240 
241     private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
242         // ensure creation is limited to one thread per channel
243         Channel channel = ctx.channel();
244         Integer key = channel.hashCode();
245         PerChannel perChannel = channelQueues.get(key);
246         if (perChannel == null) {
247             perChannel = new PerChannel();
248             perChannel.messagesQueue = new ArrayDeque<ToSend>();
249             perChannel.queueSize = 0L;
250             perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
251             perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
252             channelQueues.put(key, perChannel);
253         }
254         return perChannel;
255     }
256 
257     @Override
258     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
259         getOrSetPerChannel(ctx);
260         super.handlerAdded(ctx);
261     }
262 
263     @Override
264     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
265         Channel channel = ctx.channel();
266         Integer key = channel.hashCode();
267         PerChannel perChannel = channelQueues.remove(key);
268         if (perChannel != null) {
269             // write operations need synchronization
270             synchronized (perChannel) {
271                 if (channel.isActive()) {
272                     for (ToSend toSend : perChannel.messagesQueue) {
273                         long size = calculateSize(toSend.toSend);
274                         trafficCounter.bytesRealWriteFlowControl(size);
275                         perChannel.queueSize -= size;
276                         queuesSize.addAndGet(-size);
277                         ctx.write(toSend.toSend, toSend.promise);
278                     }
279                 } else {
280                     queuesSize.addAndGet(-perChannel.queueSize);
281                     for (ToSend toSend : perChannel.messagesQueue) {
282                         if (toSend.toSend instanceof ByteBuf) {
283                             ((ByteBuf) toSend.toSend).release();
284                         }
285                     }
286                 }
287                 perChannel.messagesQueue.clear();
288             }
289         }
290         releaseWriteSuspended(ctx);
291         releaseReadSuspended(ctx);
292         super.handlerRemoved(ctx);
293     }
294 
295     @Override
296     long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
297         Integer key = ctx.channel().hashCode();
298         PerChannel perChannel = channelQueues.get(key);
299         if (perChannel != null) {
300             if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
301                 wait = maxTime;
302             }
303         }
304         return wait;
305     }
306 
307     @Override
308     void informReadOperation(final ChannelHandlerContext ctx, final long now) {
309         Integer key = ctx.channel().hashCode();
310         PerChannel perChannel = channelQueues.get(key);
311         if (perChannel != null) {
312             perChannel.lastReadTimestamp = now;
313         }
314     }
315 
316     private static final class ToSend {
317         final long relativeTimeAction;
318         final Object toSend;
319         final long size;
320         final ChannelPromise promise;
321 
322         private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
323             relativeTimeAction = delay;
324             this.toSend = toSend;
325             this.size = size;
326             this.promise = promise;
327         }
328     }
329 
330     @Override
331     void submitWrite(final ChannelHandlerContext ctx, final Object msg,
332             final long size, final long writedelay, final long now,
333             final ChannelPromise promise) {
334         Channel channel = ctx.channel();
335         Integer key = channel.hashCode();
336         PerChannel perChannel = channelQueues.get(key);
337         if (perChannel == null) {
338             // in case write occurs before handlerAdded is raised for this handler
339             // imply a synchronized only if needed
340             perChannel = getOrSetPerChannel(ctx);
341         }
342         final ToSend newToSend;
343         long delay = writedelay;
344         boolean globalSizeExceeded = false;
345         // write operations need synchronization
346         synchronized (perChannel) {
347             if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
348                 trafficCounter.bytesRealWriteFlowControl(size);
349                 ctx.write(msg, promise);
350                 perChannel.lastWriteTimestamp = now;
351                 return;
352             }
353             if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
354                 delay = maxTime;
355             }
356             newToSend = new ToSend(delay + now, msg, size, promise);
357             perChannel.messagesQueue.addLast(newToSend);
358             perChannel.queueSize += size;
359             queuesSize.addAndGet(size);
360             checkWriteSuspend(ctx, delay, perChannel.queueSize);
361             if (queuesSize.get() > maxGlobalWriteSize) {
362                 globalSizeExceeded = true;
363             }
364         }
365         if (globalSizeExceeded) {
366             setUserDefinedWritability(ctx, false);
367         }
368         final long futureNow = newToSend.relativeTimeAction;
369         final PerChannel forSchedule = perChannel;
370         ctx.executor().schedule(new Runnable() {
371             @Override
372             public void run() {
373                 sendAllValid(ctx, forSchedule, futureNow);
374             }
375         }, delay, TimeUnit.MILLISECONDS);
376     }
377 
378     private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
379         // write operations need synchronization
380         synchronized (perChannel) {
381             ToSend newToSend = perChannel.messagesQueue.pollFirst();
382             for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
383                 if (newToSend.relativeTimeAction <= now) {
384                     long size = newToSend.size;
385                     trafficCounter.bytesRealWriteFlowControl(size);
386                     perChannel.queueSize -= size;
387                     queuesSize.addAndGet(-size);
388                     ctx.write(newToSend.toSend, newToSend.promise);
389                     perChannel.lastWriteTimestamp = now;
390                 } else {
391                     perChannel.messagesQueue.addFirst(newToSend);
392                     break;
393                 }
394             }
395             if (perChannel.messagesQueue.isEmpty()) {
396                 releaseWriteSuspended(ctx);
397             }
398         }
399         ctx.flush();
400     }
401 }