1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.traffic;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.channel.Channel;
20 import io.netty5.channel.ChannelHandler;
21 import io.netty5.channel.ChannelHandlerContext;
22 import io.netty5.channel.ChannelOption;
23 import io.netty5.channel.FileRegion;
24 import io.netty5.util.Attribute;
25 import io.netty5.util.AttributeKey;
26 import io.netty5.util.concurrent.Future;
27 import io.netty5.util.concurrent.Promise;
28 import io.netty5.util.internal.logging.InternalLogger;
29 import io.netty5.util.internal.logging.InternalLoggerFactory;
30
31 import java.util.concurrent.TimeUnit;
32
33 import static io.netty5.util.internal.ObjectUtil.checkPositive;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 public abstract class AbstractTrafficShapingHandler implements ChannelHandler {
52 private static final InternalLogger logger =
53 InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class);
54
55
56
57 public static final long DEFAULT_CHECK_INTERVAL = 1000;
58
59
60
61
62
63
64 public static final long DEFAULT_MAX_TIME = 15000;
65
66
67
68
69 static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
70
71
72
73
74 static final long MINIMAL_WAIT = 10;
75
76
77
78
79 protected TrafficCounter trafficCounter;
80
81
82
83
84 private volatile long writeLimit;
85
86
87
88
89 private volatile long readLimit;
90
91
92
93
94 protected volatile long maxTime = DEFAULT_MAX_TIME;
95
96
97
98
99 protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL;
100
101 static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey
102 .valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED");
103 static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class
104 .getName() + ".REOPEN_TASK");
105
106
107
108
109 volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL;
110
111
112
113 volatile long maxWriteSize = DEFAULT_MAX_SIZE;
114
115
116
117
118
119 final int userDefinedWritabilityIndex;
120
121
122
123
124 static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
125
126
127
128
129 static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
130
131
132
133
134 static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
135
136
137
138
139
140 void setTrafficCounter(TrafficCounter newTrafficCounter) {
141 trafficCounter = newTrafficCounter;
142 }
143
144
145
146
147
148
149
150
151 protected int userDefinedWritabilityIndex() {
152 return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
153 }
154
155
156
157
158
159
160
161
162
163
164
165
166
167 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
168 this.maxTime = checkPositive(maxTime, "maxTime");
169
170 userDefinedWritabilityIndex = userDefinedWritabilityIndex();
171 this.writeLimit = writeLimit;
172 this.readLimit = readLimit;
173 this.checkInterval = checkInterval;
174 }
175
176
177
178
179
180
181
182
183
184
185
186 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) {
187 this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
188 }
189
190
191
192
193
194
195
196
197
198
199 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) {
200 this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
201 }
202
203
204
205
206
207 protected AbstractTrafficShapingHandler() {
208 this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
209 }
210
211
212
213
214
215
216
217
218
219 protected AbstractTrafficShapingHandler(long checkInterval) {
220 this(0, 0, checkInterval, DEFAULT_MAX_TIME);
221 }
222
223
224
225
226
227
228
229
230
231
232
233
234
235 public void configure(long newWriteLimit, long newReadLimit,
236 long newCheckInterval) {
237 configure(newWriteLimit, newReadLimit);
238 configure(newCheckInterval);
239 }
240
241
242
243
244
245
246
247
248
249
250
251
252 public void configure(long newWriteLimit, long newReadLimit) {
253 writeLimit = newWriteLimit;
254 readLimit = newReadLimit;
255 if (trafficCounter != null) {
256 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
257 }
258 }
259
260
261
262
263
264
265 public void configure(long newCheckInterval) {
266 checkInterval = newCheckInterval;
267 if (trafficCounter != null) {
268 trafficCounter.configure(checkInterval);
269 }
270 }
271
272
273
274
275 public long getWriteLimit() {
276 return writeLimit;
277 }
278
279
280
281
282
283
284
285
286
287
288 public void setWriteLimit(long writeLimit) {
289 this.writeLimit = writeLimit;
290 if (trafficCounter != null) {
291 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
292 }
293 }
294
295
296
297
298 public long getReadLimit() {
299 return readLimit;
300 }
301
302
303
304
305
306
307
308
309
310
311 public void setReadLimit(long readLimit) {
312 this.readLimit = readLimit;
313 if (trafficCounter != null) {
314 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
315 }
316 }
317
318
319
320
321 public long getCheckInterval() {
322 return checkInterval;
323 }
324
325
326
327
328 public void setCheckInterval(long checkInterval) {
329 this.checkInterval = checkInterval;
330 if (trafficCounter != null) {
331 trafficCounter.configure(checkInterval);
332 }
333 }
334
335
336
337
338
339
340
341
342
343
344
345
346 public void setMaxTimeWait(long maxTime) {
347 this.maxTime = checkPositive(maxTime, "maxTime");
348 }
349
350
351
352
353 public long getMaxTimeWait() {
354 return maxTime;
355 }
356
357
358
359
360 public long getMaxWriteDelay() {
361 return maxWriteDelay;
362 }
363
364
365
366
367
368
369
370
371
372
373
374 public void setMaxWriteDelay(long maxWriteDelay) {
375 this.maxWriteDelay = checkPositive(maxWriteDelay, "maxWriteDelay");
376 }
377
378
379
380
381 public long getMaxWriteSize() {
382 return maxWriteSize;
383 }
384
385
386
387
388
389
390
391
392
393
394
395
396
397 public void setMaxWriteSize(long maxWriteSize) {
398 this.maxWriteSize = maxWriteSize;
399 }
400
401
402
403
404
405
406
407
408 protected void doAccounting(TrafficCounter counter) {
409
410 }
411
412
413
414
415 static final class ReopenReadTimerTask implements Runnable {
416 final ChannelHandlerContext ctx;
417 ReopenReadTimerTask(ChannelHandlerContext ctx) {
418 this.ctx = ctx;
419 }
420
421 @Override
422 public void run() {
423 Channel channel = ctx.channel();
424 if (!channel.getOption(ChannelOption.AUTO_READ) && isHandlerActive(ctx)) {
425
426
427 if (logger.isDebugEnabled()) {
428 logger.debug("Not unsuspend: " + channel.getOption(ChannelOption.AUTO_READ) + ':' +
429 isHandlerActive(ctx));
430 }
431 channel.attr(READ_SUSPENDED).set(false);
432 } else {
433
434 if (logger.isDebugEnabled()) {
435 if (channel.getOption(ChannelOption.AUTO_READ) && !isHandlerActive(ctx)) {
436 if (logger.isDebugEnabled()) {
437 logger.debug("Unsuspend: " + channel.getOption(ChannelOption.AUTO_READ) + ':' +
438 isHandlerActive(ctx));
439 }
440 } else {
441 if (logger.isDebugEnabled()) {
442 logger.debug("Normal unsuspend: " + channel.getOption(ChannelOption.AUTO_READ) + ':'
443 + isHandlerActive(ctx));
444 }
445 }
446 }
447 channel.attr(READ_SUSPENDED).set(false);
448 channel.setOption(ChannelOption.AUTO_READ, true);
449 channel.read();
450 }
451 if (logger.isDebugEnabled()) {
452 logger.debug("Unsuspend final status => " + channel.getOption(ChannelOption.AUTO_READ) + ':'
453 + isHandlerActive(ctx));
454 }
455 }
456 }
457
458
459
460
461 void releaseReadSuspended(ChannelHandlerContext ctx) {
462 Channel channel = ctx.channel();
463 channel.attr(READ_SUSPENDED).set(false);
464 channel.setOption(ChannelOption.AUTO_READ, true);
465 }
466
467 @Override
468 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
469 long size = calculateSize(msg);
470 long now = TrafficCounter.milliSecondFromNano();
471 if (size > 0) {
472
473 long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
474 wait = checkWaitReadTime(ctx, wait, now);
475 if (wait >= MINIMAL_WAIT) {
476
477
478 Channel channel = ctx.channel();
479 if (logger.isDebugEnabled()) {
480 logger.debug("Read suspend: " + wait + ':' + channel.getOption(ChannelOption.AUTO_READ) + ':'
481 + isHandlerActive(ctx));
482 }
483 if (channel.getOption(ChannelOption.AUTO_READ) && isHandlerActive(ctx)) {
484 channel.setOption(ChannelOption.AUTO_READ, false);
485 channel.attr(READ_SUSPENDED).set(true);
486
487
488 Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
489 Runnable reopenTask = attr.get();
490 if (reopenTask == null) {
491 reopenTask = new ReopenReadTimerTask(ctx);
492 attr.set(reopenTask);
493 }
494 ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
495 if (logger.isDebugEnabled()) {
496 logger.debug("Suspend final status => " + channel.getOption(ChannelOption.AUTO_READ) + ':'
497 + isHandlerActive(ctx) + " will reopened at: " + wait);
498 }
499 }
500 }
501 }
502 informReadOperation(ctx, now);
503 ctx.fireChannelRead(msg);
504 }
505
506 @Override
507 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
508 Channel channel = ctx.channel();
509 if (channel.hasAttr(REOPEN_TASK)) {
510
511 channel.attr(REOPEN_TASK).set(null);
512 }
513 }
514
515
516
517
518
519
520
521 long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
522
523 return wait;
524 }
525
526
527
528
529
530 void informReadOperation(final ChannelHandlerContext ctx, final long now) {
531
532 }
533
534 protected static boolean isHandlerActive(ChannelHandlerContext ctx) {
535 Boolean suspended = ctx.channel().attr(READ_SUSPENDED).get();
536 return suspended == null || Boolean.FALSE.equals(suspended);
537 }
538
539 @Override
540 public void read(ChannelHandlerContext ctx) {
541 if (isHandlerActive(ctx)) {
542
543 ctx.read();
544 }
545 }
546
547 @Override
548 public Future<Void> write(final ChannelHandlerContext ctx, final Object msg) {
549 long size = calculateSize(msg);
550 long now = TrafficCounter.milliSecondFromNano();
551 Promise<Void> promise = ctx.newPromise();
552 if (size > 0) {
553
554 long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
555 if (wait >= MINIMAL_WAIT) {
556 if (logger.isDebugEnabled()) {
557 logger.debug("Write suspend: " + wait + ':'
558 + ctx.channel().getOption(ChannelOption.AUTO_READ) + ':'
559 + isHandlerActive(ctx));
560 }
561 submitWrite(ctx, msg, size, wait, now, promise);
562 return promise.asFuture();
563 }
564 }
565
566 submitWrite(ctx, msg, size, 0, now, promise);
567 return promise.asFuture();
568 }
569
570 @Deprecated
571 protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
572 final long delay, final Promise<Void> promise) {
573 submitWrite(ctx, msg, calculateSize(msg),
574 delay, TrafficCounter.milliSecondFromNano(), promise);
575 }
576
577 abstract void submitWrite(
578 ChannelHandlerContext ctx, Object msg, long size, long delay, long now, Promise<Void> promise);
579
580 @Override
581 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
582 setUserDefinedWritability(ctx, true);
583 ctx.fireChannelRegistered();
584 }
585
586
587 void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
588
589
590
591
592
593
594 }
595
596
597
598
599
600
601
602 void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
603 if (queueSize > maxWriteSize || delay > maxWriteDelay) {
604 setUserDefinedWritability(ctx, false);
605 }
606 }
607
608
609
610 void releaseWriteSuspended(ChannelHandlerContext ctx) {
611 setUserDefinedWritability(ctx, true);
612 }
613
614
615
616
617
618 public TrafficCounter trafficCounter() {
619 return trafficCounter;
620 }
621
622 @Override
623 public String toString() {
624 StringBuilder builder = new StringBuilder(290)
625 .append("TrafficShaping with Write Limit: ").append(writeLimit)
626 .append(" Read Limit: ").append(readLimit)
627 .append(" CheckInterval: ").append(checkInterval)
628 .append(" maxDelay: ").append(maxWriteDelay)
629 .append(" maxSize: ").append(maxWriteSize)
630 .append(" and Counter: ");
631 if (trafficCounter != null) {
632 builder.append(trafficCounter);
633 } else {
634 builder.append("none");
635 }
636 return builder.toString();
637 }
638
639
640
641
642
643
644
645
646
647 protected long calculateSize(Object msg) {
648
649 if (msg instanceof Buffer) {
650 return ((Buffer) msg).readableBytes();
651 }
652 if (msg instanceof FileRegion) {
653 return ((FileRegion) msg).count();
654 }
655 return -1;
656 }
657 }