1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.traffic;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelHandler.Sharable;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelPromise;
23 import io.netty.util.concurrent.EventExecutor;
24 import io.netty.util.internal.ObjectUtil;
25 import io.netty.util.internal.PlatformDependent;
26
27 import java.util.ArrayDeque;
28 import java.util.concurrent.ConcurrentMap;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicLong;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 @Sharable
79 public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
80
81
82
83 private final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
84
85
86
87
88 private final AtomicLong queuesSize = new AtomicLong();
89
90
91
92
93
94 long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100;
95
96 private static final class PerChannel {
97 ArrayDeque<ToSend> messagesQueue;
98 long queueSize;
99 long lastWriteTimestamp;
100 long lastReadTimestamp;
101 }
102
103
104
105
106 void createGlobalTrafficCounter(ScheduledExecutorService executor) {
107 TrafficCounter tc = new TrafficCounter(this,
108 ObjectUtil.checkNotNull(executor, "executor"),
109 "GlobalTC",
110 checkInterval);
111
112 setTrafficCounter(tc);
113 tc.start();
114 }
115
116 @Override
117 protected int userDefinedWritabilityIndex() {
118 return AbstractTrafficShapingHandler.GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
119 }
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit,
137 long checkInterval, long maxTime) {
138 super(writeLimit, readLimit, checkInterval, maxTime);
139 createGlobalTrafficCounter(executor);
140 }
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156 public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit,
157 long readLimit, long checkInterval) {
158 super(writeLimit, readLimit, checkInterval);
159 createGlobalTrafficCounter(executor);
160 }
161
162
163
164
165
166
167
168
169
170
171
172
173 public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit,
174 long readLimit) {
175 super(writeLimit, readLimit);
176 createGlobalTrafficCounter(executor);
177 }
178
179
180
181
182
183
184
185
186
187
188
189 public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) {
190 super(checkInterval);
191 createGlobalTrafficCounter(executor);
192 }
193
194
195
196
197
198
199
200
201 public GlobalTrafficShapingHandler(EventExecutor executor) {
202 createGlobalTrafficCounter(executor);
203 }
204
205
206
207
208 public long getMaxGlobalWriteSize() {
209 return maxGlobalWriteSize;
210 }
211
212
213
214
215
216
217
218
219
220
221
222
223 public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
224 this.maxGlobalWriteSize = maxGlobalWriteSize;
225 }
226
227
228
229
230 public long queuesSize() {
231 return queuesSize.get();
232 }
233
234
235
236
237 public final void release() {
238 trafficCounter.stop();
239 }
240
241 private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
242
243 Channel channel = ctx.channel();
244 Integer key = channel.hashCode();
245 PerChannel perChannel = channelQueues.get(key);
246 if (perChannel == null) {
247 perChannel = new PerChannel();
248 perChannel.messagesQueue = new ArrayDeque<ToSend>();
249 perChannel.queueSize = 0L;
250 perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
251 perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
252 channelQueues.put(key, perChannel);
253 }
254 return perChannel;
255 }
256
257 @Override
258 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
259 getOrSetPerChannel(ctx);
260 super.handlerAdded(ctx);
261 }
262
263 @Override
264 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
265 Channel channel = ctx.channel();
266 Integer key = channel.hashCode();
267 PerChannel perChannel = channelQueues.remove(key);
268 if (perChannel != null) {
269
270 synchronized (perChannel) {
271 if (channel.isActive()) {
272 for (ToSend toSend : perChannel.messagesQueue) {
273 long size = calculateSize(toSend.toSend);
274 trafficCounter.bytesRealWriteFlowControl(size);
275 perChannel.queueSize -= size;
276 queuesSize.addAndGet(-size);
277 ctx.write(toSend.toSend, toSend.promise);
278 }
279 } else {
280 queuesSize.addAndGet(-perChannel.queueSize);
281 for (ToSend toSend : perChannel.messagesQueue) {
282 if (toSend.toSend instanceof ByteBuf) {
283 ((ByteBuf) toSend.toSend).release();
284 }
285 }
286 }
287 perChannel.messagesQueue.clear();
288 }
289 }
290 releaseWriteSuspended(ctx);
291 releaseReadSuspended(ctx);
292 super.handlerRemoved(ctx);
293 }
294
295 @Override
296 long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
297 Integer key = ctx.channel().hashCode();
298 PerChannel perChannel = channelQueues.get(key);
299 if (perChannel != null) {
300 if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
301 wait = maxTime;
302 }
303 }
304 return wait;
305 }
306
307 @Override
308 void informReadOperation(final ChannelHandlerContext ctx, final long now) {
309 Integer key = ctx.channel().hashCode();
310 PerChannel perChannel = channelQueues.get(key);
311 if (perChannel != null) {
312 perChannel.lastReadTimestamp = now;
313 }
314 }
315
316 private static final class ToSend {
317 final long relativeTimeAction;
318 final Object toSend;
319 final long size;
320 final ChannelPromise promise;
321
322 private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
323 relativeTimeAction = delay;
324 this.toSend = toSend;
325 this.size = size;
326 this.promise = promise;
327 }
328 }
329
330 @Override
331 void submitWrite(final ChannelHandlerContext ctx, final Object msg,
332 final long size, final long writedelay, final long now,
333 final ChannelPromise promise) {
334 Channel channel = ctx.channel();
335 Integer key = channel.hashCode();
336 PerChannel perChannel = channelQueues.get(key);
337 if (perChannel == null) {
338
339
340 perChannel = getOrSetPerChannel(ctx);
341 }
342 final ToSend newToSend;
343 long delay = writedelay;
344 boolean globalSizeExceeded = false;
345
346 synchronized (perChannel) {
347 if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
348 trafficCounter.bytesRealWriteFlowControl(size);
349 ctx.write(msg, promise);
350 perChannel.lastWriteTimestamp = now;
351 return;
352 }
353 if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
354 delay = maxTime;
355 }
356 newToSend = new ToSend(delay + now, msg, size, promise);
357 perChannel.messagesQueue.addLast(newToSend);
358 perChannel.queueSize += size;
359 queuesSize.addAndGet(size);
360 checkWriteSuspend(ctx, delay, perChannel.queueSize);
361 if (queuesSize.get() > maxGlobalWriteSize) {
362 globalSizeExceeded = true;
363 }
364 }
365 if (globalSizeExceeded) {
366 setUserDefinedWritability(ctx, false);
367 }
368 final long futureNow = newToSend.relativeTimeAction;
369 final PerChannel forSchedule = perChannel;
370 ctx.executor().schedule(new Runnable() {
371 @Override
372 public void run() {
373 sendAllValid(ctx, forSchedule, futureNow);
374 }
375 }, delay, TimeUnit.MILLISECONDS);
376 }
377
378 private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
379
380 synchronized (perChannel) {
381 ToSend newToSend = perChannel.messagesQueue.pollFirst();
382 for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
383 if (newToSend.relativeTimeAction <= now) {
384 long size = newToSend.size;
385 trafficCounter.bytesRealWriteFlowControl(size);
386 perChannel.queueSize -= size;
387 queuesSize.addAndGet(-size);
388 ctx.write(newToSend.toSend, newToSend.promise);
389 perChannel.lastWriteTimestamp = now;
390 } else {
391 perChannel.messagesQueue.addFirst(newToSend);
392 break;
393 }
394 }
395 if (perChannel.messagesQueue.isEmpty()) {
396 releaseWriteSuspended(ctx);
397 }
398 }
399 ctx.flush();
400 }
401 }