1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelHandlerContext;
20 import org.jboss.netty.channel.ChannelPipelineFactory;
21 import org.jboss.netty.channel.ChannelStateEvent;
22 import org.jboss.netty.channel.MessageEvent;
23 import org.jboss.netty.handler.execution.ExecutionHandler;
24 import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
25 import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
26 import org.jboss.netty.util.ObjectSizeEstimator;
27 import org.jboss.netty.util.Timeout;
28 import org.jboss.netty.util.Timer;
29 import org.jboss.netty.util.TimerTask;
30
31 import java.util.LinkedList;
32 import java.util.List;
33 import java.util.concurrent.TimeUnit;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
83 private final List<ToSend> messagesQueue = new LinkedList<ToSend>();
84 private long queueSize;
85 private volatile Timeout writeTimeout;
86 private volatile ChannelHandlerContext ctx;
87
88 public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
89 long readLimit, long checkInterval) {
90 super(timer, writeLimit, readLimit, checkInterval);
91 }
92
93 public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
94 long readLimit, long checkInterval, long maxTime) {
95 super(timer, writeLimit, readLimit, checkInterval, maxTime);
96 }
97
98 public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
99 long readLimit) {
100 super(timer, writeLimit, readLimit);
101 }
102
103 public ChannelTrafficShapingHandler(Timer timer, long checkInterval) {
104 super(timer, checkInterval);
105 }
106
107 public ChannelTrafficShapingHandler(Timer timer) {
108 super(timer);
109 }
110
111 public ChannelTrafficShapingHandler(
112 ObjectSizeEstimator objectSizeEstimator, Timer timer,
113 long writeLimit, long readLimit, long checkInterval) {
114 super(objectSizeEstimator, timer, writeLimit, readLimit,
115 checkInterval);
116 }
117
118 public ChannelTrafficShapingHandler(
119 ObjectSizeEstimator objectSizeEstimator, Timer timer,
120 long writeLimit, long readLimit, long checkInterval, long maxTime) {
121 super(objectSizeEstimator, timer, writeLimit, readLimit,
122 checkInterval, maxTime);
123 }
124
125 public ChannelTrafficShapingHandler(
126 ObjectSizeEstimator objectSizeEstimator, Timer timer,
127 long writeLimit, long readLimit) {
128 super(objectSizeEstimator, timer, writeLimit, readLimit);
129 }
130
131 public ChannelTrafficShapingHandler(
132 ObjectSizeEstimator objectSizeEstimator, Timer timer,
133 long checkInterval) {
134 super(objectSizeEstimator, timer, checkInterval);
135 }
136
137 public ChannelTrafficShapingHandler(
138 ObjectSizeEstimator objectSizeEstimator, Timer timer) {
139 super(objectSizeEstimator, timer);
140 }
141
142 private static final class ToSend {
143 final long relativeTimeAction;
144 final MessageEvent toSend;
145
146 private ToSend(final long delay, final MessageEvent toSend) {
147 relativeTimeAction = delay;
148 this.toSend = toSend;
149 }
150 }
151
152 @Override
153 void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long size,
154 final long delay, final long now) throws Exception {
155 if (ctx == null) {
156 this.ctx = ctx;
157 }
158 final ToSend newToSend;
159 Channel channel = ctx.getChannel();
160 synchronized (this) {
161 if (delay == 0 && messagesQueue.isEmpty()) {
162 if (! channel.isConnected()) {
163
164 return;
165 }
166 if (trafficCounter != null) {
167 trafficCounter.bytesRealWriteFlowControl(size);
168 }
169 ctx.sendDownstream(evt);
170 return;
171 }
172 if (timer == null) {
173
174 Thread.sleep(delay);
175 if (! channel.isConnected()) {
176
177 return;
178 }
179 if (trafficCounter != null) {
180 trafficCounter.bytesRealWriteFlowControl(size);
181 }
182 ctx.sendDownstream(evt);
183 return;
184 }
185 if (! channel.isConnected()) {
186
187 return;
188 }
189 newToSend = new ToSend(delay + now, evt);
190 messagesQueue.add(newToSend);
191 queueSize += size;
192 checkWriteSuspend(ctx, delay, queueSize);
193 }
194 final long futureNow = newToSend.relativeTimeAction;
195 writeTimeout = timer.newTimeout(new TimerTask() {
196 public void run(Timeout timeout) throws Exception {
197 sendAllValid(ctx, futureNow);
198 }
199 }, delay + 1, TimeUnit.MILLISECONDS);
200 }
201
202 private void sendAllValid(ChannelHandlerContext ctx, final long now) throws Exception {
203 Channel channel = ctx.getChannel();
204 if (! channel.isConnected()) {
205
206 return;
207 }
208 synchronized (this) {
209 while (!messagesQueue.isEmpty()) {
210 ToSend newToSend = messagesQueue.remove(0);
211 if (newToSend.relativeTimeAction <= now) {
212 long size = calculateSize(newToSend.toSend.getMessage());
213 if (trafficCounter != null) {
214 trafficCounter.bytesRealWriteFlowControl(size);
215 }
216 queueSize -= size;
217 if (! channel.isConnected()) {
218
219 break;
220 }
221 ctx.sendDownstream(newToSend.toSend);
222 } else {
223 messagesQueue.add(0, newToSend);
224 break;
225 }
226 }
227 if (messagesQueue.isEmpty()) {
228 releaseWriteSuspended(ctx);
229 }
230 }
231 }
232
233
234
235
236 public long queueSize() {
237 return queueSize;
238 }
239
240 @Override
241 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
242 throws Exception {
243 if (trafficCounter != null) {
244 trafficCounter.stop();
245 }
246 synchronized (this) {
247 messagesQueue.clear();
248 }
249 if (writeTimeout != null) {
250 writeTimeout.cancel();
251 }
252 super.channelClosed(ctx, e);
253 }
254
255 @Override
256 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
257 throws Exception {
258 this.ctx = ctx;
259
260 ReadWriteStatus rws = checkAttachment(ctx);
261 rws.readSuspend = true;
262 ctx.getChannel().setReadable(false);
263 if (trafficCounter == null) {
264
265 if (timer != null) {
266 trafficCounter = new TrafficCounter(this, timer, "ChannelTC" +
267 ctx.getChannel().getId(), checkInterval);
268 }
269 }
270 if (trafficCounter != null) {
271 trafficCounter.start();
272 }
273 rws.readSuspend = false;
274 ctx.getChannel().setReadable(true);
275 super.channelConnected(ctx, e);
276 }
277
278 @Override
279 public void releaseExternalResources() {
280 Channel channel = ctx.getChannel();
281 synchronized (this) {
282 if (ctx != null && ctx.getChannel().isConnected()) {
283 for (ToSend toSend : messagesQueue) {
284 if (! channel.isConnected()) {
285
286 break;
287 }
288 ctx.sendDownstream(toSend.toSend);
289 }
290 }
291 messagesQueue.clear();
292 }
293 if (writeTimeout != null) {
294 writeTimeout.cancel();
295 }
296 super.releaseExternalResources();
297 }
298
299 }