1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.traffic;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.ChannelPromise;
21
22 import java.util.ArrayDeque;
23 import java.util.concurrent.TimeUnit;
24
25 /**
26 * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for channel
27 * traffic shaping, that is to say a per channel limitation of the bandwidth.</p>
28 * <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>1</b>.</p>
29 *
30 * <p>The general use should be as follow:</p>
31 * <ul>
32 * <li><p>Add in your pipeline a new ChannelTrafficShapingHandler.</p>
33 * <p><tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler();</tt></p>
34 * <p><tt>pipeline.addLast(myHandler);</tt></p>
35 *
36 * <p><b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
37 * for each new channel as the counter cannot be shared among all channels.</b>.</p>
38 *
39 * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
40 * or the check interval (in millisecond) that represents the delay between two computations of the
41 * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
42 *
43 * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
44 * it is recommended to set a positive value, even if it is high since the precision of the
45 * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
46 * the less precise the traffic shaping will be. It is suggested as higher value something close
47 * to 5 or 10 minutes.</p>
48 *
49 * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
50 * </li>
51 * <li>In your handler, you should consider to use the {@code channel.isWritable()} and
52 * {@code channelWritabilityChanged(ctx)} to handle writability, or through
53 * {@code future.addListener(new GenericFutureListener())} on the future returned by
54 * {@code ctx.write()}.</li>
55 * <li><p>You shall also consider to have object size in read or write operations relatively adapted to
56 * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
57 * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
58 * <li><p>Some configuration methods will be taken as best effort, meaning
59 * that all already scheduled traffics will not be
60 * changed, but only applied to new traffics.</p>
61 * <p>So the expected usage of those methods are to be used not too often,
62 * accordingly to the traffic shaping configuration.</p></li>
63 * </ul>
64 */
65 public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
66 private final ArrayDeque<ToSend> messagesQueue = new ArrayDeque<ToSend>();
67 private long queueSize;
68
69 /**
70 * Create a new instance.
71 *
72 * @param writeLimit
73 * 0 or a limit in bytes/s
74 * @param readLimit
75 * 0 or a limit in bytes/s
76 * @param checkInterval
77 * The delay between two computations of performances for
78 * channels or 0 if no stats are to be computed.
79 * @param maxTime
80 * The maximum delay to wait in case of traffic excess.
81 */
82 public ChannelTrafficShapingHandler(long writeLimit, long readLimit,
83 long checkInterval, long maxTime) {
84 super(writeLimit, readLimit, checkInterval, maxTime);
85 }
86
87 /**
88 * Create a new instance using default
89 * max time as delay allowed value of 15000 ms.
90 *
91 * @param writeLimit
92 * 0 or a limit in bytes/s
93 * @param readLimit
94 * 0 or a limit in bytes/s
95 * @param checkInterval
96 * The delay between two computations of performances for
97 * channels or 0 if no stats are to be computed.
98 */
99 public ChannelTrafficShapingHandler(long writeLimit,
100 long readLimit, long checkInterval) {
101 super(writeLimit, readLimit, checkInterval);
102 }
103
104 /**
105 * Create a new instance using default Check Interval value of 1000 ms and
106 * max time as delay allowed value of 15000 ms.
107 *
108 * @param writeLimit
109 * 0 or a limit in bytes/s
110 * @param readLimit
111 * 0 or a limit in bytes/s
112 */
113 public ChannelTrafficShapingHandler(long writeLimit,
114 long readLimit) {
115 super(writeLimit, readLimit);
116 }
117
118 /**
119 * Create a new instance using
120 * default max time as delay allowed value of 15000 ms and no limit.
121 *
122 * @param checkInterval
123 * The delay between two computations of performances for
124 * channels or 0 if no stats are to be computed.
125 */
126 public ChannelTrafficShapingHandler(long checkInterval) {
127 super(checkInterval);
128 }
129
130 @Override
131 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
132 TrafficCounter trafficCounter = new TrafficCounter(this, ctx.executor(), "ChannelTC" +
133 ctx.channel().hashCode(), checkInterval);
134 setTrafficCounter(trafficCounter);
135 trafficCounter.start();
136 super.handlerAdded(ctx);
137 }
138
139 @Override
140 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
141 trafficCounter.stop();
142 // write order control
143 synchronized (this) {
144 if (ctx.channel().isActive()) {
145 for (ToSend toSend : messagesQueue) {
146 long size = calculateSize(toSend.toSend);
147 trafficCounter.bytesRealWriteFlowControl(size);
148 queueSize -= size;
149 ctx.write(toSend.toSend, toSend.promise);
150 }
151 } else {
152 for (ToSend toSend : messagesQueue) {
153 if (toSend.toSend instanceof ByteBuf) {
154 ((ByteBuf) toSend.toSend).release();
155 }
156 }
157 }
158 messagesQueue.clear();
159 }
160 releaseWriteSuspended(ctx);
161 releaseReadSuspended(ctx);
162 super.handlerRemoved(ctx);
163 }
164
165 private static final class ToSend {
166 final long relativeTimeAction;
167 final Object toSend;
168 final ChannelPromise promise;
169
170 private ToSend(final long delay, final Object toSend, final ChannelPromise promise) {
171 relativeTimeAction = delay;
172 this.toSend = toSend;
173 this.promise = promise;
174 }
175 }
176
177 @Override
178 void submitWrite(final ChannelHandlerContext ctx, final Object msg,
179 final long size, final long delay, final long now,
180 final ChannelPromise promise) {
181 final ToSend newToSend;
182 // write order control
183 synchronized (this) {
184 if (delay == 0 && messagesQueue.isEmpty()) {
185 trafficCounter.bytesRealWriteFlowControl(size);
186 ctx.write(msg, promise);
187 return;
188 }
189 newToSend = new ToSend(delay + now, msg, promise);
190 messagesQueue.addLast(newToSend);
191 queueSize += size;
192 checkWriteSuspend(ctx, delay, queueSize);
193 }
194 final long futureNow = newToSend.relativeTimeAction;
195 ctx.executor().schedule(new Runnable() {
196 @Override
197 public void run() {
198 sendAllValid(ctx, futureNow);
199 }
200 }, delay, TimeUnit.MILLISECONDS);
201 }
202
203 private void sendAllValid(final ChannelHandlerContext ctx, final long now) {
204 // write order control
205 synchronized (this) {
206 ToSend newToSend = messagesQueue.pollFirst();
207 for (; newToSend != null; newToSend = messagesQueue.pollFirst()) {
208 if (newToSend.relativeTimeAction <= now) {
209 long size = calculateSize(newToSend.toSend);
210 trafficCounter.bytesRealWriteFlowControl(size);
211 queueSize -= size;
212 ctx.write(newToSend.toSend, newToSend.promise);
213 } else {
214 messagesQueue.addFirst(newToSend);
215 break;
216 }
217 }
218 if (messagesQueue.isEmpty()) {
219 releaseWriteSuspended(ctx);
220 }
221 }
222 ctx.flush();
223 }
224
225 /**
226 * @return current size in bytes of the write buffer.
227 */
228 public long queueSize() {
229 return queueSize;
230 }
231 }