View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.traffic;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.channel.ChannelPromise;
21  
22  import java.util.ArrayDeque;
23  import java.util.concurrent.TimeUnit;
24  
25  /**
26   * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for channel
27   * traffic shaping, that is to say a per channel limitation of the bandwidth.</p>
28   * <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>1</b>.</p>
29   *
30   * <p>The general use should be as follow:</p>
31   * <ul>
32   * <li><p>Add in your pipeline a new ChannelTrafficShapingHandler.</p>
33   * <p><tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler();</tt></p>
34   * <p><tt>pipeline.addLast(myHandler);</tt></p>
35   *
36   * <p><b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
37   * for each new channel as the counter cannot be shared among all channels.</b>.</p>
38   *
39   * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
40   * or the check interval (in millisecond) that represents the delay between two computations of the
41   * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
42   *
43   * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
44   * it is recommended to set a positive value, even if it is high since the precision of the
45   * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
46   * the less precise the traffic shaping will be. It is suggested as higher value something close
47   * to 5 or 10 minutes.</p>
48   *
49   * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
50   * </li>
51   * <li>In your handler, you should consider to use the {@code channel.isWritable()} and
52   * {@code channelWritabilityChanged(ctx)} to handle writability, or through
53   * {@code future.addListener(new GenericFutureListener())} on the future returned by
54   * {@code ctx.write()}.</li>
55   * <li><p>You shall also consider to have object size in read or write operations relatively adapted to
56   * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
57   * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
58   * <li><p>Some configuration methods will be taken as best effort, meaning
59   * that all already scheduled traffics will not be
60   * changed, but only applied to new traffics.</p>
61   * <p>So the expected usage of those methods are to be used not too often,
62   * accordingly to the traffic shaping configuration.</p></li>
63   * </ul>
64   */
65  public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
66      private final ArrayDeque<ToSend> messagesQueue = new ArrayDeque<ToSend>();
67      private long queueSize;
68  
69      /**
70       * Create a new instance.
71       *
72       * @param writeLimit
73       *            0 or a limit in bytes/s
74       * @param readLimit
75       *            0 or a limit in bytes/s
76       * @param checkInterval
77       *            The delay between two computations of performances for
78       *            channels or 0 if no stats are to be computed.
79       * @param maxTime
80       *            The maximum delay to wait in case of traffic excess.
81       */
82      public ChannelTrafficShapingHandler(long writeLimit, long readLimit,
83              long checkInterval, long maxTime) {
84          super(writeLimit, readLimit, checkInterval, maxTime);
85      }
86  
87      /**
88       * Create a new instance using default
89       * max time as delay allowed value of 15000 ms.
90       *
91       * @param writeLimit
92       *          0 or a limit in bytes/s
93       * @param readLimit
94       *          0 or a limit in bytes/s
95       * @param checkInterval
96       *          The delay between two computations of performances for
97       *            channels or 0 if no stats are to be computed.
98       */
99      public ChannelTrafficShapingHandler(long writeLimit,
100             long readLimit, long checkInterval) {
101         super(writeLimit, readLimit, checkInterval);
102     }
103 
104     /**
105      * Create a new instance using default Check Interval value of 1000 ms and
106      * max time as delay allowed value of 15000 ms.
107      *
108      * @param writeLimit
109      *          0 or a limit in bytes/s
110      * @param readLimit
111      *          0 or a limit in bytes/s
112      */
113     public ChannelTrafficShapingHandler(long writeLimit,
114             long readLimit) {
115         super(writeLimit, readLimit);
116     }
117 
118     /**
119      * Create a new instance using
120      * default max time as delay allowed value of 15000 ms and no limit.
121      *
122      * @param checkInterval
123      *          The delay between two computations of performances for
124      *            channels or 0 if no stats are to be computed.
125      */
126     public ChannelTrafficShapingHandler(long checkInterval) {
127         super(checkInterval);
128     }
129 
130     @Override
131     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
132         TrafficCounter trafficCounter = new TrafficCounter(this, ctx.executor(), "ChannelTC" +
133                 ctx.channel().hashCode(), checkInterval);
134         setTrafficCounter(trafficCounter);
135         trafficCounter.start();
136         super.handlerAdded(ctx);
137     }
138 
139     @Override
140     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
141         trafficCounter.stop();
142         // write order control
143         synchronized (this) {
144             if (ctx.channel().isActive()) {
145                 for (ToSend toSend : messagesQueue) {
146                     long size = calculateSize(toSend.toSend);
147                     trafficCounter.bytesRealWriteFlowControl(size);
148                     queueSize -= size;
149                     ctx.write(toSend.toSend, toSend.promise);
150                 }
151             } else {
152                 for (ToSend toSend : messagesQueue) {
153                     if (toSend.toSend instanceof ByteBuf) {
154                         ((ByteBuf) toSend.toSend).release();
155                     }
156                 }
157             }
158             messagesQueue.clear();
159         }
160         releaseWriteSuspended(ctx);
161         releaseReadSuspended(ctx);
162         super.handlerRemoved(ctx);
163     }
164 
165     private static final class ToSend {
166         final long relativeTimeAction;
167         final Object toSend;
168         final ChannelPromise promise;
169 
170         private ToSend(final long delay, final Object toSend, final ChannelPromise promise) {
171             relativeTimeAction = delay;
172             this.toSend = toSend;
173             this.promise = promise;
174         }
175     }
176 
177     @Override
178     void submitWrite(final ChannelHandlerContext ctx, final Object msg,
179             final long size, final long delay, final long now,
180             final ChannelPromise promise) {
181         final ToSend newToSend;
182         // write order control
183         synchronized (this) {
184             if (delay == 0 && messagesQueue.isEmpty()) {
185                 trafficCounter.bytesRealWriteFlowControl(size);
186                 ctx.write(msg, promise);
187                 return;
188             }
189             newToSend = new ToSend(delay + now, msg, promise);
190             messagesQueue.addLast(newToSend);
191             queueSize += size;
192             checkWriteSuspend(ctx, delay, queueSize);
193         }
194         final long futureNow = newToSend.relativeTimeAction;
195         ctx.executor().schedule(new Runnable() {
196             @Override
197             public void run() {
198                 sendAllValid(ctx, futureNow);
199             }
200         }, delay, TimeUnit.MILLISECONDS);
201     }
202 
203     private void sendAllValid(final ChannelHandlerContext ctx, final long now) {
204         // write order control
205         synchronized (this) {
206             ToSend newToSend = messagesQueue.pollFirst();
207             for (; newToSend != null; newToSend = messagesQueue.pollFirst()) {
208                 if (newToSend.relativeTimeAction <= now) {
209                     long size = calculateSize(newToSend.toSend);
210                     trafficCounter.bytesRealWriteFlowControl(size);
211                     queueSize -= size;
212                     ctx.write(newToSend.toSend, newToSend.promise);
213                 } else {
214                     messagesQueue.addFirst(newToSend);
215                     break;
216                 }
217             }
218             if (messagesQueue.isEmpty()) {
219                 releaseWriteSuspended(ctx);
220             }
221         }
222         ctx.flush();
223     }
224 
225     /**
226     * @return current size in bytes of the write buffer.
227     */
228    public long queueSize() {
229        return queueSize;
230    }
231 }