1
2
3
4
5
6
7
8
9
10
11
12
13 package io.netty.handler.traffic;
14
15 import io.netty.buffer.ByteBuf;
16 import io.netty.buffer.ByteBufHolder;
17 import io.netty.channel.ChannelDuplexHandler;
18 import io.netty.channel.ChannelConfig;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.ChannelOutboundBuffer;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.util.Attribute;
23 import io.netty.util.AttributeKey;
24 import io.netty.util.internal.logging.InternalLogger;
25 import io.netty.util.internal.logging.InternalLoggerFactory;
26
27 import java.util.concurrent.TimeUnit;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler {
46 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class);
47
48
49
50
51 public static final long DEFAULT_CHECK_INTERVAL = 1000;
52
53
54
55
56
57
58 public static final long DEFAULT_MAX_TIME = 15000;
59
60
61
62
63 static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
64
65
66
67
68 static final long MINIMAL_WAIT = 10;
69
70
71
72
73 protected TrafficCounter trafficCounter;
74
75
76
77
78 private volatile long writeLimit;
79
80
81
82
83 private volatile long readLimit;
84
85
86
87
88 protected volatile long maxTime = DEFAULT_MAX_TIME;
89
90
91
92
93 protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL;
94
95 static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey
96 .valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED");
97 static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class
98 .getName() + ".REOPEN_TASK");
99
100
101
102
103 volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL;
104
105
106
107 volatile long maxWriteSize = DEFAULT_MAX_SIZE;
108
109
110
111
112
113 final int userDefinedWritabilityIndex;
114
115
116
117
118 static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
119
120
121
122
123 static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
124
125
126
127
128 static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
129
130
131
132
133
134 void setTrafficCounter(TrafficCounter newTrafficCounter) {
135 trafficCounter = newTrafficCounter;
136 }
137
138
139
140
141
142
143
144
145 protected int userDefinedWritabilityIndex() {
146 return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
147 }
148
149
150
151
152
153
154
155
156
157
158
159
160
161 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
162 if (maxTime <= 0) {
163 throw new IllegalArgumentException("maxTime must be positive");
164 }
165
166 userDefinedWritabilityIndex = userDefinedWritabilityIndex();
167 this.writeLimit = writeLimit;
168 this.readLimit = readLimit;
169 this.checkInterval = checkInterval;
170 this.maxTime = maxTime;
171 }
172
173
174
175
176
177
178
179
180
181
182
183 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) {
184 this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
185 }
186
187
188
189
190
191
192
193
194
195
196 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) {
197 this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
198 }
199
200
201
202
203
204 protected AbstractTrafficShapingHandler() {
205 this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
206 }
207
208
209
210
211
212
213
214
215
216 protected AbstractTrafficShapingHandler(long checkInterval) {
217 this(0, 0, checkInterval, DEFAULT_MAX_TIME);
218 }
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235 public void configure(long newWriteLimit, long newReadLimit, long newCheckInterval) {
236 configure(newWriteLimit, newReadLimit);
237 configure(newCheckInterval);
238 }
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253 public void configure(long newWriteLimit, long newReadLimit) {
254 writeLimit = newWriteLimit;
255 readLimit = newReadLimit;
256 if (trafficCounter != null) {
257 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
258 }
259 }
260
261
262
263
264
265
266
267 public void configure(long newCheckInterval) {
268 checkInterval = newCheckInterval;
269 if (trafficCounter != null) {
270 trafficCounter.configure(checkInterval);
271 }
272 }
273
274
275
276
277 public long getWriteLimit() {
278 return writeLimit;
279 }
280
281
282
283
284
285
286
287
288
289
290 public void setWriteLimit(long writeLimit) {
291 this.writeLimit = writeLimit;
292 if (trafficCounter != null) {
293 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
294 }
295 }
296
297
298
299
300 public long getReadLimit() {
301 return readLimit;
302 }
303
304
305
306
307
308
309
310
311
312
313 public void setReadLimit(long readLimit) {
314 this.readLimit = readLimit;
315 if (trafficCounter != null) {
316 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
317 }
318 }
319
320
321
322
323 public long getCheckInterval() {
324 return checkInterval;
325 }
326
327
328
329
330 public void setCheckInterval(long checkInterval) {
331 this.checkInterval = checkInterval;
332 if (trafficCounter != null) {
333 trafficCounter.configure(checkInterval);
334 }
335 }
336
337
338
339
340
341
342
343
344
345
346
347
348 public void setMaxTimeWait(long maxTime) {
349 if (maxTime <= 0) {
350 throw new IllegalArgumentException("maxTime must be positive");
351 }
352 this.maxTime = maxTime;
353 }
354
355
356
357
358 public long getMaxTimeWait() {
359 return maxTime;
360 }
361
362
363
364
365 public long getMaxWriteDelay() {
366 return maxWriteDelay;
367 }
368
369
370
371
372
373
374
375
376
377
378
379 public void setMaxWriteDelay(long maxWriteDelay) {
380 if (maxWriteDelay <= 0) {
381 throw new IllegalArgumentException("maxWriteDelay must be positive");
382 }
383 this.maxWriteDelay = maxWriteDelay;
384 }
385
386
387
388
389 public long getMaxWriteSize() {
390 return maxWriteSize;
391 }
392
393
394
395
396
397
398
399
400
401
402
403
404
405 public void setMaxWriteSize(long maxWriteSize) {
406 this.maxWriteSize = maxWriteSize;
407 }
408
409
410
411
412
413
414
415
416 protected void doAccounting(TrafficCounter counter) {
417
418 }
419
420
421
422
423 static final class ReopenReadTimerTask implements Runnable {
424 final ChannelHandlerContext ctx;
425
426 ReopenReadTimerTask(ChannelHandlerContext ctx) {
427 this.ctx = ctx;
428 }
429
430 @Override
431 public void run() {
432 ChannelConfig config = ctx.channel().config();
433 if (!config.isAutoRead() && isHandlerActive(ctx)) {
434
435
436 if (logger.isDebugEnabled()) {
437 logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
438 isHandlerActive(ctx));
439 }
440 ctx.attr(READ_SUSPENDED).set(false);
441 } else {
442
443 if (logger.isDebugEnabled()) {
444 if (config.isAutoRead() && !isHandlerActive(ctx)) {
445 logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
446 isHandlerActive(ctx));
447 } else {
448 logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
449 + isHandlerActive(ctx));
450 }
451 }
452 ctx.attr(READ_SUSPENDED).set(false);
453 config.setAutoRead(true);
454 ctx.channel().read();
455 }
456 if (logger.isDebugEnabled()) {
457 logger.debug("Unsuspend final status => " + config.isAutoRead() + ':'
458 + isHandlerActive(ctx));
459 }
460 }
461 }
462
463
464
465
466 void releaseReadSuspended(ChannelHandlerContext ctx) {
467 ctx.attr(READ_SUSPENDED).set(false);
468 ctx.channel().config().setAutoRead(true);
469 }
470
471 @Override
472 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
473 long size = calculateSize(msg);
474 long now = TrafficCounter.milliSecondFromNano();
475 if (size > 0) {
476
477 long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
478 wait = checkWaitReadTime(ctx, wait, now);
479 if (wait >= MINIMAL_WAIT) {
480
481
482 ChannelConfig config = ctx.channel().config();
483 if (logger.isDebugEnabled()) {
484 logger.debug("Read suspend: " + wait + ':' + config.isAutoRead() + ':'
485 + isHandlerActive(ctx));
486 }
487 if (config.isAutoRead() && isHandlerActive(ctx)) {
488 config.setAutoRead(false);
489 ctx.attr(READ_SUSPENDED).set(true);
490
491
492 Attribute<Runnable> attr = ctx.attr(REOPEN_TASK);
493 Runnable reopenTask = attr.get();
494 if (reopenTask == null) {
495 reopenTask = new ReopenReadTimerTask(ctx);
496 attr.set(reopenTask);
497 }
498 ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
499 if (logger.isDebugEnabled()) {
500 logger.debug("Suspend final status => " + config.isAutoRead() + ':'
501 + isHandlerActive(ctx) + " will reopened at: " + wait);
502 }
503 }
504 }
505 }
506 informReadOperation(ctx, now);
507 ctx.fireChannelRead(msg);
508 }
509
510
511
512
513
514
515
516 long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
517
518 return wait;
519 }
520
521
522
523
524
525 void informReadOperation(final ChannelHandlerContext ctx, final long now) {
526
527 }
528
529 protected static boolean isHandlerActive(ChannelHandlerContext ctx) {
530 Boolean suspended = ctx.attr(READ_SUSPENDED).get();
531 return suspended == null || Boolean.FALSE.equals(suspended);
532 }
533
534 @Override
535 public void read(ChannelHandlerContext ctx) {
536 if (isHandlerActive(ctx)) {
537
538 ctx.read();
539 }
540 }
541
542 @Override
543 public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
544 throws Exception {
545 long size = calculateSize(msg);
546 long now = TrafficCounter.milliSecondFromNano();
547 if (size > 0) {
548
549 long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
550 if (wait >= MINIMAL_WAIT) {
551 if (logger.isDebugEnabled()) {
552 logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
553 + isHandlerActive(ctx));
554 }
555 submitWrite(ctx, msg, size, wait, now, promise);
556 return;
557 }
558 }
559
560 submitWrite(ctx, msg, size, 0, now, promise);
561 }
562
563 @Deprecated
564 protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
565 final long delay, final ChannelPromise promise) {
566 submitWrite(ctx, msg, calculateSize(msg),
567 delay, TrafficCounter.milliSecondFromNano(), promise);
568 }
569
570 abstract void submitWrite(
571 ChannelHandlerContext ctx, Object msg, long size, long delay, long now, ChannelPromise promise);
572
573 @Override
574 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
575 setUserDefinedWritability(ctx, true);
576 super.channelRegistered(ctx);
577 }
578
579 void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
580 ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
581 if (cob != null) {
582 cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable);
583 }
584 }
585
586
587
588
589
590
591
592 void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
593 if (queueSize > maxWriteSize || delay > maxWriteDelay) {
594 setUserDefinedWritability(ctx, false);
595 }
596 }
597
598
599
600 void releaseWriteSuspended(ChannelHandlerContext ctx) {
601 setUserDefinedWritability(ctx, true);
602 }
603
604
605
606
607
608 public TrafficCounter trafficCounter() {
609 return trafficCounter;
610 }
611
612 @Override
613 public String toString() {
614 StringBuilder builder = new StringBuilder(290)
615 .append("TrafficShaping with Write Limit: ").append(writeLimit)
616 .append(" Read Limit: ").append(readLimit)
617 .append(" CheckInterval: ").append(checkInterval)
618 .append(" maxDelay: ").append(maxWriteDelay)
619 .append(" maxSize: ").append(maxWriteSize)
620 .append(" and Counter: ");
621 if (trafficCounter != null) {
622 builder.append(trafficCounter);
623 } else {
624 builder.append("none");
625 }
626 return builder.toString();
627 }
628
629
630
631
632
633
634
635
636
637
638 protected long calculateSize(Object msg) {
639 if (msg instanceof ByteBuf) {
640 return ((ByteBuf) msg).readableBytes();
641 }
642 if (msg instanceof ByteBufHolder) {
643 return ((ByteBufHolder) msg).content().readableBytes();
644 }
645 return -1;
646 }
647 }