1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.traffic;
17
18 import java.util.LinkedList;
19 import java.util.List;
20 import java.util.concurrent.TimeUnit;
21
22 import org.jboss.netty.channel.ChannelHandlerContext;
23 import org.jboss.netty.channel.ChannelPipelineFactory;
24 import org.jboss.netty.channel.ChannelStateEvent;
25 import org.jboss.netty.channel.MessageEvent;
26 import org.jboss.netty.handler.execution.ExecutionHandler;
27 import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
28 import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
29 import org.jboss.netty.util.ObjectSizeEstimator;
30 import org.jboss.netty.util.Timeout;
31 import org.jboss.netty.util.Timer;
32 import org.jboss.netty.util.TimerTask;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
70 private List<ToSend> messagesQueue = new LinkedList<ToSend>();
71 private volatile Timeout writeTimeout;
72
73 public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
74 long readLimit, long checkInterval) {
75 super(timer, writeLimit, readLimit, checkInterval);
76 }
77
78 public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
79 long readLimit, long checkInterval, long maxTime) {
80 super(timer, writeLimit, readLimit, checkInterval, maxTime);
81 }
82
83 public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
84 long readLimit) {
85 super(timer, writeLimit, readLimit);
86 }
87
88 public ChannelTrafficShapingHandler(Timer timer, long checkInterval) {
89 super(timer, checkInterval);
90 }
91
92 public ChannelTrafficShapingHandler(Timer timer) {
93 super(timer);
94 }
95
96 public ChannelTrafficShapingHandler(
97 ObjectSizeEstimator objectSizeEstimator, Timer timer,
98 long writeLimit, long readLimit, long checkInterval) {
99 super(objectSizeEstimator, timer, writeLimit, readLimit,
100 checkInterval);
101 }
102
103 public ChannelTrafficShapingHandler(
104 ObjectSizeEstimator objectSizeEstimator, Timer timer,
105 long writeLimit, long readLimit, long checkInterval, long maxTime) {
106 super(objectSizeEstimator, timer, writeLimit, readLimit,
107 checkInterval, maxTime);
108 }
109
110 public ChannelTrafficShapingHandler(
111 ObjectSizeEstimator objectSizeEstimator, Timer timer,
112 long writeLimit, long readLimit) {
113 super(objectSizeEstimator, timer, writeLimit, readLimit);
114 }
115
116 public ChannelTrafficShapingHandler(
117 ObjectSizeEstimator objectSizeEstimator, Timer timer,
118 long checkInterval) {
119 super(objectSizeEstimator, timer, checkInterval);
120 }
121
122 public ChannelTrafficShapingHandler(
123 ObjectSizeEstimator objectSizeEstimator, Timer timer) {
124 super(objectSizeEstimator, timer);
125 }
126
127 private static final class ToSend {
128 final long date;
129 final MessageEvent toSend;
130
131 private ToSend(final long delay, final MessageEvent toSend) {
132 this.date = System.currentTimeMillis() + delay;
133 this.toSend = toSend;
134 }
135 }
136
137 @Override
138 protected synchronized void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long delay)
139 throws Exception {
140 if (delay == 0 && messagesQueue.isEmpty()) {
141 internalSubmitWrite(ctx, evt);
142 return;
143 }
144 if (timer == null) {
145
146 Thread.sleep(delay);
147 internalSubmitWrite(ctx, evt);
148 return;
149 }
150 final ToSend newToSend = new ToSend(delay, evt);
151 messagesQueue.add(newToSend);
152 writeTimeout = timer.newTimeout(new TimerTask() {
153 public void run(Timeout timeout) throws Exception {
154 sendAllValid(ctx);
155 }
156 }, delay + 1, TimeUnit.MILLISECONDS);
157 }
158
159 private synchronized void sendAllValid(ChannelHandlerContext ctx) throws Exception {
160 while (!messagesQueue.isEmpty()) {
161 ToSend newToSend = messagesQueue.remove(0);
162 if (newToSend.date <= System.currentTimeMillis()) {
163 internalSubmitWrite(ctx, newToSend.toSend);
164 } else {
165 messagesQueue.add(0, newToSend);
166 break;
167 }
168 }
169 }
170
171 @Override
172 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
173 throws Exception {
174 if (trafficCounter != null) {
175 trafficCounter.stop();
176 }
177 messagesQueue.clear();
178 if (writeTimeout != null) {
179 writeTimeout.cancel();
180 }
181 super.channelClosed(ctx, e);
182 }
183
184 @Override
185 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
186 throws Exception {
187
188 ctx.setAttachment(Boolean.TRUE);
189 ctx.getChannel().setReadable(false);
190 if (trafficCounter == null) {
191
192 if (timer != null) {
193 trafficCounter = new TrafficCounter(this, timer, "ChannelTC" +
194 ctx.getChannel().getId(), checkInterval);
195 }
196 }
197 if (trafficCounter != null) {
198 trafficCounter.start();
199 }
200
201 ctx.setAttachment(null);
202 ctx.getChannel().setReadable(true);
203 super.channelConnected(ctx, e);
204 }
205
206 }