View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.traffic;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandler.Sharable;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.util.concurrent.EventExecutor;
24  import io.netty.util.internal.PlatformDependent;
25  
26  import java.util.ArrayDeque;
27  import java.util.concurrent.ConcurrentMap;
28  import java.util.concurrent.ScheduledExecutorService;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  /**
33   * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global
34   * traffic shaping, that is to say a global limitation of the bandwidth, whatever
35   * the number of opened channels.</p>
36   * <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>2</b>.</p>
37   *
38   * <p>The general use should be as follow:</p>
39   * <ul>
40   * <li><p>Create your unique GlobalTrafficShapingHandler like:</p>
41   * <p><tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);</tt></p>
42   * <p>The executor could be the underlying IO worker pool</p>
43   * <p><tt>pipeline.addLast(myHandler);</tt></p>
44   *
45   * <p><b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
46   * and shared among all channels as the counter must be shared among all channels.</b></p>
47   *
48   * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
49   * or the check interval (in millisecond) that represents the delay between two computations of the
50   * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
51   *
52   * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
53   * it is recommended to set a positive value, even if it is high since the precision of the
54   * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
55   * the less precise the traffic shaping will be. It is suggested as higher value something close
56   * to 5 or 10 minutes.</p>
57   *
58   * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
59   * </li>
60   * <li>In your handler, you should consider to use the {@code channel.isWritable()} and
61   * {@code channelWritabilityChanged(ctx)} to handle writability, or through
62   * {@code future.addListener(new GenericFutureListener())} on the future returned by
63   * {@code ctx.write()}.</li>
64   * <li><p>You shall also consider to have object size in read or write operations relatively adapted to
65   * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
66   * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
67   * <li><p>Some configuration methods will be taken as best effort, meaning
68   * that all already scheduled traffics will not be
69   * changed, but only applied to new traffics.</p>
70   * So the expected usage of those methods are to be used not too often,
71   * accordingly to the traffic shaping configuration.</li>
72   * </ul>
73   *
74   * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources.
75   * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own.
76   */
77  @Sharable
78  public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
79      /**
80       * All queues per channel
81       */
82      private final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
83  
84      /**
85       * Global queues size
86       */
87      private final AtomicLong queuesSize = new AtomicLong();
88  
89      /**
90       * Max size in the list before proposing to stop writing new objects from next handlers
91       * for all channel (global)
92       */
93      long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
94  
95      private static final class PerChannel {
96          ArrayDeque<ToSend> messagesQueue;
97          long queueSize;
98          long lastWriteTimestamp;
99          long lastReadTimestamp;
100     }
101 
102     /**
103      * Create the global TrafficCounter.
104      */
105     void createGlobalTrafficCounter(ScheduledExecutorService executor) {
106         if (executor == null) {
107             throw new NullPointerException("executor");
108         }
109         TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC", checkInterval);
110         setTrafficCounter(tc);
111         tc.start();
112     }
113 
114     @Override
115     protected int userDefinedWritabilityIndex() {
116         return AbstractTrafficShapingHandler.GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
117     }
118 
119     /**
120      * Create a new instance.
121      *
122      * @param executor
123      *            the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
124      * @param writeLimit
125      *            0 or a limit in bytes/s
126      * @param readLimit
127      *            0 or a limit in bytes/s
128      * @param checkInterval
129      *            The delay between two computations of performances for
130      *            channels or 0 if no stats are to be computed.
131      * @param maxTime
132      *            The maximum delay to wait in case of traffic excess.
133      */
134     public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit,
135             long checkInterval, long maxTime) {
136         super(writeLimit, readLimit, checkInterval, maxTime);
137         createGlobalTrafficCounter(executor);
138     }
139 
140     /**
141      * Create a new instance using
142      * default max time as delay allowed value of 15000 ms.
143      *
144      * @param executor
145      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
146      * @param writeLimit
147      *          0 or a limit in bytes/s
148      * @param readLimit
149      *          0 or a limit in bytes/s
150      * @param checkInterval
151      *          The delay between two computations of performances for
152      *            channels or 0 if no stats are to be computed.
153      */
154     public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit,
155             long readLimit, long checkInterval) {
156         super(writeLimit, readLimit, checkInterval);
157         createGlobalTrafficCounter(executor);
158     }
159 
160     /**
161      * Create a new instance using default Check Interval value of 1000 ms and
162      * default max time as delay allowed value of 15000 ms.
163      *
164      * @param executor
165      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
166      * @param writeLimit
167      *          0 or a limit in bytes/s
168      * @param readLimit
169      *          0 or a limit in bytes/s
170      */
171     public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit,
172             long readLimit) {
173         super(writeLimit, readLimit);
174         createGlobalTrafficCounter(executor);
175     }
176 
177     /**
178      * Create a new instance using
179      * default max time as delay allowed value of 15000 ms and no limit.
180      *
181      * @param executor
182      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
183      * @param checkInterval
184      *          The delay between two computations of performances for
185      *            channels or 0 if no stats are to be computed.
186      */
187     public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) {
188         super(checkInterval);
189         createGlobalTrafficCounter(executor);
190     }
191 
192     /**
193      * Create a new instance using default Check Interval value of 1000 ms and
194      * default max time as delay allowed value of 15000 ms and no limit.
195      *
196      * @param executor
197      *          the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
198      */
199     public GlobalTrafficShapingHandler(EventExecutor executor) {
200         createGlobalTrafficCounter(executor);
201     }
202 
203     /**
204      * @return the maxGlobalWriteSize default value being 400 MB.
205      */
206     public long getMaxGlobalWriteSize() {
207         return maxGlobalWriteSize;
208     }
209 
210     /**
211      * Note the change will be taken as best effort, meaning
212      * that all already scheduled traffics will not be
213      * changed, but only applied to new traffics.<br>
214      * So the expected usage of this method is to be used not too often,
215      * accordingly to the traffic shaping configuration.
216      *
217      * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
218      *            globally for all channels before write suspended is set,
219      *            default value being 400 MB.
220      */
221     public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
222         this.maxGlobalWriteSize = maxGlobalWriteSize;
223     }
224 
225     /**
226      * @return the global size of the buffers for all queues.
227      */
228     public long queuesSize() {
229         return queuesSize.get();
230     }
231 
232     /**
233      * Release all internal resources of this instance.
234      */
235     public final void release() {
236         trafficCounter.stop();
237     }
238 
239     private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
240         // ensure creation is limited to one thread per channel
241         Channel channel = ctx.channel();
242         Integer key = channel.hashCode();
243         PerChannel perChannel = channelQueues.get(key);
244         if (perChannel == null) {
245             perChannel = new PerChannel();
246             perChannel.messagesQueue = new ArrayDeque<ToSend>();
247             perChannel.queueSize = 0L;
248             perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
249             perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
250             channelQueues.put(key, perChannel);
251         }
252         return perChannel;
253     }
254 
255     @Override
256     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
257         getOrSetPerChannel(ctx);
258         super.handlerAdded(ctx);
259     }
260 
261     @Override
262     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
263         Channel channel = ctx.channel();
264         Integer key = channel.hashCode();
265         PerChannel perChannel = channelQueues.remove(key);
266         if (perChannel != null) {
267             // write operations need synchronization
268             synchronized (perChannel) {
269                 if (channel.isActive()) {
270                     for (ToSend toSend : perChannel.messagesQueue) {
271                         long size = calculateSize(toSend.toSend);
272                         trafficCounter.bytesRealWriteFlowControl(size);
273                         perChannel.queueSize -= size;
274                         queuesSize.addAndGet(-size);
275                         ctx.write(toSend.toSend, toSend.promise);
276                     }
277                 } else {
278                     queuesSize.addAndGet(-perChannel.queueSize);
279                     for (ToSend toSend : perChannel.messagesQueue) {
280                         if (toSend.toSend instanceof ByteBuf) {
281                             ((ByteBuf) toSend.toSend).release();
282                         }
283                     }
284                 }
285                 perChannel.messagesQueue.clear();
286             }
287         }
288         releaseWriteSuspended(ctx);
289         releaseReadSuspended(ctx);
290         super.handlerRemoved(ctx);
291     }
292 
293     @Override
294     long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
295         Integer key = ctx.channel().hashCode();
296         PerChannel perChannel = channelQueues.get(key);
297         if (perChannel != null) {
298             if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
299                 wait = maxTime;
300             }
301         }
302         return wait;
303     }
304 
305     @Override
306     void informReadOperation(final ChannelHandlerContext ctx, final long now) {
307         Integer key = ctx.channel().hashCode();
308         PerChannel perChannel = channelQueues.get(key);
309         if (perChannel != null) {
310             perChannel.lastReadTimestamp = now;
311         }
312     }
313 
314     private static final class ToSend {
315         final long relativeTimeAction;
316         final Object toSend;
317         final long size;
318         final ChannelPromise promise;
319 
320         private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
321             relativeTimeAction = delay;
322             this.toSend = toSend;
323             this.size = size;
324             this.promise = promise;
325         }
326     }
327 
328     @Override
329     void submitWrite(final ChannelHandlerContext ctx, final Object msg,
330             final long size, final long writedelay, final long now,
331             final ChannelPromise promise) {
332         Channel channel = ctx.channel();
333         Integer key = channel.hashCode();
334         PerChannel perChannel = channelQueues.get(key);
335         if (perChannel == null) {
336             // in case write occurs before handlerAdded is raised for this handler
337             // imply a synchronized only if needed
338             perChannel = getOrSetPerChannel(ctx);
339         }
340         final ToSend newToSend;
341         long delay = writedelay;
342         boolean globalSizeExceeded = false;
343         // write operations need synchronization
344         synchronized (perChannel) {
345             if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
346                 trafficCounter.bytesRealWriteFlowControl(size);
347                 ctx.write(msg, promise);
348                 perChannel.lastWriteTimestamp = now;
349                 return;
350             }
351             if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
352                 delay = maxTime;
353             }
354             newToSend = new ToSend(delay + now, msg, size, promise);
355             perChannel.messagesQueue.addLast(newToSend);
356             perChannel.queueSize += size;
357             queuesSize.addAndGet(size);
358             checkWriteSuspend(ctx, delay, perChannel.queueSize);
359             if (queuesSize.get() > maxGlobalWriteSize) {
360                 globalSizeExceeded = true;
361             }
362         }
363         if (globalSizeExceeded) {
364             setUserDefinedWritability(ctx, false);
365         }
366         final long futureNow = newToSend.relativeTimeAction;
367         final PerChannel forSchedule = perChannel;
368         ctx.executor().schedule(new Runnable() {
369             @Override
370             public void run() {
371                 sendAllValid(ctx, forSchedule, futureNow);
372             }
373         }, delay, TimeUnit.MILLISECONDS);
374     }
375 
376     private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
377         // write operations need synchronization
378         synchronized (perChannel) {
379             ToSend newToSend = perChannel.messagesQueue.pollFirst();
380             for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
381                 if (newToSend.relativeTimeAction <= now) {
382                     long size = newToSend.size;
383                     trafficCounter.bytesRealWriteFlowControl(size);
384                     perChannel.queueSize -= size;
385                     queuesSize.addAndGet(-size);
386                     ctx.write(newToSend.toSend, newToSend.promise);
387                     perChannel.lastWriteTimestamp = now;
388                 } else {
389                     perChannel.messagesQueue.addFirst(newToSend);
390                     break;
391                 }
392             }
393             if (perChannel.messagesQueue.isEmpty()) {
394                 releaseWriteSuspended(ctx);
395             }
396         }
397         ctx.flush();
398     }
399 }