1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelHandler.Sharable;
20 import org.jboss.netty.channel.ChannelHandlerContext;
21 import org.jboss.netty.channel.ChannelStateEvent;
22 import org.jboss.netty.channel.MessageEvent;
23 import org.jboss.netty.handler.execution.ExecutionHandler;
24 import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
25 import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
26 import org.jboss.netty.util.ObjectSizeEstimator;
27 import org.jboss.netty.util.Timeout;
28 import org.jboss.netty.util.Timer;
29 import org.jboss.netty.util.TimerTask;
30
31 import java.util.LinkedList;
32 import java.util.List;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicLong;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 @Sharable
77 public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
78 private final ConcurrentMap<Integer, PerChannel> channelQueues = new ConcurrentHashMap<Integer, PerChannel>();
79
80
81
82
83 private final AtomicLong queuesSize = new AtomicLong();
84
85
86
87
88
89 long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100;
90
91 private static final class PerChannel {
92 List<ToSend> messagesQueue;
93 ChannelHandlerContext ctx;
94 long queueSize;
95 long lastWriteTimestamp;
96 long lastReadTimestamp;
97 }
98
99
100
101 void createGlobalTrafficCounter() {
102 TrafficCounter tc;
103 if (timer != null) {
104 tc = new TrafficCounter(this, timer, "GlobalTC",
105 checkInterval);
106 setTrafficCounter(tc);
107 tc.start();
108 }
109 }
110
111 public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
112 long readLimit, long checkInterval) {
113 super(timer, writeLimit, readLimit, checkInterval);
114 createGlobalTrafficCounter();
115 }
116
117 public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
118 long readLimit, long checkInterval, long maxTime) {
119 super(timer, writeLimit, readLimit, checkInterval, maxTime);
120 createGlobalTrafficCounter();
121 }
122
123 public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
124 long readLimit) {
125 super(timer, writeLimit, readLimit);
126 createGlobalTrafficCounter();
127 }
128
129 public GlobalTrafficShapingHandler(Timer timer, long checkInterval) {
130 super(timer, checkInterval);
131 createGlobalTrafficCounter();
132 }
133
134 public GlobalTrafficShapingHandler(Timer timer) {
135 super(timer);
136 createGlobalTrafficCounter();
137 }
138
139 public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
140 Timer timer, long writeLimit, long readLimit,
141 long checkInterval) {
142 super(objectSizeEstimator, timer, writeLimit, readLimit,
143 checkInterval);
144 createGlobalTrafficCounter();
145 }
146
147 public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
148 Timer timer, long writeLimit, long readLimit,
149 long checkInterval, long maxTime) {
150 super(objectSizeEstimator, timer, writeLimit, readLimit,
151 checkInterval, maxTime);
152 createGlobalTrafficCounter();
153 }
154
155 public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
156 Timer timer, long writeLimit, long readLimit) {
157 super(objectSizeEstimator, timer, writeLimit, readLimit);
158 createGlobalTrafficCounter();
159 }
160
161 public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
162 Timer timer, long checkInterval) {
163 super(objectSizeEstimator, timer, checkInterval);
164 createGlobalTrafficCounter();
165 }
166
167 public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
168 Timer timer) {
169 super(objectSizeEstimator, timer);
170 createGlobalTrafficCounter();
171 }
172
173
174
175
176 public long getMaxGlobalWriteSize() {
177 return maxGlobalWriteSize;
178 }
179
180
181
182
183
184
185 public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
186 this.maxGlobalWriteSize = maxGlobalWriteSize;
187 }
188
189
190
191
192 public long queuesSize() {
193 return queuesSize.get();
194 }
195
196 private synchronized PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
197 Integer key = ctx.getChannel().hashCode();
198 PerChannel perChannel = channelQueues.get(key);
199 if (perChannel == null) {
200 perChannel = new PerChannel();
201 perChannel.messagesQueue = new LinkedList<ToSend>();
202 perChannel.ctx = ctx;
203 perChannel.queueSize = 0L;
204 perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
205 perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
206 channelQueues.put(key, perChannel);
207 }
208 return perChannel;
209 }
210
211 private static final class ToSend {
212 final long relativeTimeAction;
213 final MessageEvent toSend;
214 final long size;
215
216 private ToSend(final long delay, final MessageEvent toSend, final long size) {
217 relativeTimeAction = delay;
218 this.toSend = toSend;
219 this.size = size;
220 }
221 }
222
223 @Override
224 long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
225 Integer key = ctx.getChannel().hashCode();
226 PerChannel perChannel = channelQueues.get(key);
227 if (perChannel != null) {
228 if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
229 wait = maxTime;
230 }
231 }
232 return wait;
233 }
234 @Override
235 void informReadOperation(final ChannelHandlerContext ctx, final long now) {
236 Integer key = ctx.getChannel().hashCode();
237 PerChannel perChannel = channelQueues.get(key);
238 if (perChannel != null) {
239 perChannel.lastReadTimestamp = now;
240 }
241 }
242
243 @Override
244 void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt,
245 final long size, final long writedelay, final long now)
246 throws Exception {
247 PerChannel perChannel = getOrSetPerChannel(ctx);
248 long delay;
249 final ToSend newToSend;
250 boolean globalSizeExceeded = false;
251 Channel channel = ctx.getChannel();
252 synchronized (perChannel) {
253 if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
254 if (! channel.isConnected()) {
255
256 return;
257 }
258 if (trafficCounter != null) {
259 trafficCounter.bytesRealWriteFlowControl(size);
260 }
261 ctx.sendDownstream(evt);
262 perChannel.lastWriteTimestamp = now;
263 return;
264 }
265 delay = writedelay;
266 if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
267 delay = maxTime;
268 }
269 if (timer == null) {
270
271 Thread.sleep(delay);
272 if (! ctx.getChannel().isConnected()) {
273
274 return;
275 }
276 if (trafficCounter != null) {
277 trafficCounter.bytesRealWriteFlowControl(size);
278 }
279 ctx.sendDownstream(evt);
280 perChannel.lastWriteTimestamp = now;
281 return;
282 }
283 if (! ctx.getChannel().isConnected()) {
284
285 return;
286 }
287 newToSend = new ToSend(delay + now, evt, size);
288 perChannel.messagesQueue.add(newToSend);
289 perChannel.queueSize += size;
290 queuesSize.addAndGet(size);
291 checkWriteSuspend(ctx, delay, perChannel.queueSize);
292 if (queuesSize.get() > maxGlobalWriteSize) {
293 globalSizeExceeded = true;
294 }
295 }
296 if (globalSizeExceeded) {
297 setWritable(ctx, false);
298 }
299 final long futureNow = newToSend.relativeTimeAction;
300 final PerChannel forSchedule = perChannel;
301 timer.newTimeout(new TimerTask() {
302 public void run(Timeout timeout) throws Exception {
303 sendAllValid(ctx, forSchedule, futureNow);
304 }
305 }, delay, TimeUnit.MILLISECONDS);
306 }
307
308 private void sendAllValid(ChannelHandlerContext ctx, final PerChannel perChannel, final long now)
309 throws Exception {
310 Channel channel = ctx.getChannel();
311 if (! channel.isConnected()) {
312
313 return;
314 }
315 synchronized (perChannel) {
316 while (!perChannel.messagesQueue.isEmpty()) {
317 ToSend newToSend = perChannel.messagesQueue.remove(0);
318 if (newToSend.relativeTimeAction <= now) {
319 if (! channel.isConnected()) {
320
321 break;
322 }
323 long size = newToSend.size;
324 if (trafficCounter != null) {
325 trafficCounter.bytesRealWriteFlowControl(size);
326 }
327 perChannel.queueSize -= size;
328 queuesSize.addAndGet(-size);
329 ctx.sendDownstream(newToSend.toSend);
330 perChannel.lastWriteTimestamp = now;
331 } else {
332 perChannel.messagesQueue.add(0, newToSend);
333 break;
334 }
335 }
336 if (perChannel.messagesQueue.isEmpty()) {
337 releaseWriteSuspended(ctx);
338 }
339 }
340 }
341
342 @Override
343 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
344 throws Exception {
345 getOrSetPerChannel(ctx);
346 super.channelConnected(ctx, e);
347 }
348
349 @Override
350 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
351 throws Exception {
352 Integer key = ctx.getChannel().hashCode();
353 PerChannel perChannel = channelQueues.remove(key);
354 if (perChannel != null) {
355 synchronized (perChannel) {
356 queuesSize.addAndGet(-perChannel.queueSize);
357 perChannel.messagesQueue.clear();
358 }
359 }
360 super.channelClosed(ctx, e);
361 }
362
363 @Override
364 public void releaseExternalResources() {
365 for (PerChannel perChannel : channelQueues.values()) {
366 if (perChannel != null && perChannel.ctx != null && perChannel.ctx.getChannel().isConnected()) {
367 Channel channel = perChannel.ctx.getChannel();
368 synchronized (perChannel) {
369 for (ToSend toSend : perChannel.messagesQueue) {
370 if (! channel.isConnected()) {
371
372 break;
373 }
374 perChannel.ctx.sendDownstream(toSend.toSend);
375 }
376 perChannel.messagesQueue.clear();
377 }
378 }
379 }
380 channelQueues.clear();
381 queuesSize.set(0);
382 super.releaseExternalResources();
383 }
384 }