1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import java.util.HashMap;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.concurrent.TimeUnit;
23
24 import org.jboss.netty.channel.ChannelHandler.Sharable;
25 import org.jboss.netty.channel.ChannelHandlerContext;
26 import org.jboss.netty.channel.ChannelStateEvent;
27 import org.jboss.netty.channel.MessageEvent;
28 import org.jboss.netty.handler.execution.ExecutionHandler;
29 import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
30 import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
31 import org.jboss.netty.util.ObjectSizeEstimator;
32 import org.jboss.netty.util.Timeout;
33 import org.jboss.netty.util.Timer;
34 import org.jboss.netty.util.TimerTask;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 @Sharable
75 public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
76 private Map<Integer, List<ToSend>> messagesQueues = new HashMap<Integer, List<ToSend>>();
77
78
79
80
81 void createGlobalTrafficCounter() {
82 TrafficCounter tc;
83 if (timer != null) {
84 tc = new TrafficCounter(this, timer, "GlobalTC",
85 checkInterval);
86 setTrafficCounter(tc);
87 tc.start();
88 }
89 }
90
91 public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
92 long readLimit, long checkInterval) {
93 super(timer, writeLimit, readLimit, checkInterval);
94 createGlobalTrafficCounter();
95 }
96
97 public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
98 long readLimit, long checkInterval, long maxTime) {
99 super(timer, writeLimit, readLimit, checkInterval, maxTime);
100 createGlobalTrafficCounter();
101 }
102
103 public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
104 long readLimit) {
105 super(timer, writeLimit, readLimit);
106 createGlobalTrafficCounter();
107 }
108
109 public GlobalTrafficShapingHandler(Timer timer, long checkInterval) {
110 super(timer, checkInterval);
111 createGlobalTrafficCounter();
112 }
113
114 public GlobalTrafficShapingHandler(Timer timer) {
115 super(timer);
116 createGlobalTrafficCounter();
117 }
118
119 public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
120 Timer timer, long writeLimit, long readLimit,
121 long checkInterval) {
122 super(objectSizeEstimator, timer, writeLimit, readLimit,
123 checkInterval);
124 createGlobalTrafficCounter();
125 }
126
127 public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
128 Timer timer, long writeLimit, long readLimit,
129 long checkInterval, long maxTime) {
130 super(objectSizeEstimator, timer, writeLimit, readLimit,
131 checkInterval, maxTime);
132 createGlobalTrafficCounter();
133 }
134
135 public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
136 Timer timer, long writeLimit, long readLimit) {
137 super(objectSizeEstimator, timer, writeLimit, readLimit);
138 createGlobalTrafficCounter();
139 }
140
141 public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
142 Timer timer, long checkInterval) {
143 super(objectSizeEstimator, timer, checkInterval);
144 createGlobalTrafficCounter();
145 }
146
147 public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
148 Timer timer) {
149 super(objectSizeEstimator, timer);
150 createGlobalTrafficCounter();
151 }
152
153 private static final class ToSend {
154 final long date;
155 final MessageEvent toSend;
156
157 private ToSend(final long delay, final MessageEvent toSend) {
158 this.date = System.currentTimeMillis() + delay;
159 this.toSend = toSend;
160 }
161 }
162
163 @Override
164 protected synchronized void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long delay)
165 throws Exception {
166 Integer key = ctx.getChannel().getId();
167 List<ToSend> messagesQueue = messagesQueues.get(key);
168 if (delay == 0 && (messagesQueue == null || messagesQueue.isEmpty())) {
169 internalSubmitWrite(ctx, evt);
170 return;
171 }
172 if (timer == null) {
173
174 Thread.sleep(delay);
175 internalSubmitWrite(ctx, evt);
176 return;
177 }
178 if (messagesQueue == null) {
179 messagesQueue = new LinkedList<ToSend>();
180 messagesQueues.put(key, messagesQueue);
181 }
182 final ToSend newToSend = new ToSend(delay, evt);
183 messagesQueue.add(newToSend);
184 final List<ToSend> mqfinal = messagesQueue;
185 timer.newTimeout(new TimerTask() {
186 public void run(Timeout timeout) throws Exception {
187 sendAllValid(ctx, mqfinal);
188 }
189 }, delay + 1, TimeUnit.MILLISECONDS);
190 }
191
192 private synchronized void sendAllValid(ChannelHandlerContext ctx, final List<ToSend> messagesQueue)
193 throws Exception {
194 while (!messagesQueue.isEmpty()) {
195 ToSend newToSend = messagesQueue.remove(0);
196 if (newToSend.date <= System.currentTimeMillis()) {
197 internalSubmitWrite(ctx, newToSend.toSend);
198 } else {
199 messagesQueue.add(0, newToSend);
200 break;
201 }
202 }
203 }
204
205 @Override
206 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
207 throws Exception {
208 Integer key = ctx.getChannel().getId();
209 List<ToSend> messagesQueue = new LinkedList<ToSend>();
210 messagesQueues.put(key, messagesQueue);
211 super.channelConnected(ctx, e);
212 }
213
214 @Override
215 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
216 throws Exception {
217 Integer key = ctx.getChannel().hashCode();
218 List<ToSend> mq = messagesQueues.remove(key);
219 if (mq != null) {
220 mq.clear();
221 }
222 super.channelClosed(ctx, e);
223 }
224 }