1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.util;
17
18 import io.netty5.util.concurrent.ImmediateExecutor;
19 import io.netty5.util.internal.PlatformDependent;
20 import io.netty5.util.internal.logging.InternalLogger;
21 import io.netty5.util.internal.logging.InternalLoggerFactory;
22
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.Queue;
26 import java.util.Set;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
36 import java.util.concurrent.atomic.AtomicLong;
37
38 import static io.netty5.util.internal.ObjectUtil.checkInRange;
39 import static io.netty5.util.internal.ObjectUtil.checkPositive;
40 import static io.netty5.util.internal.StringUtil.simpleClassName;
41 import static java.util.Objects.requireNonNull;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 public class HashedWheelTimer implements Timer {
85
86 static final InternalLogger logger =
87 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
88
89 private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
90 private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
91 private static final int INSTANCE_COUNT_LIMIT = 64;
92 private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
93 private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
94 .newResourceLeakDetector(HashedWheelTimer.class, 1);
95
96 private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
97 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
98
99 private final ResourceLeakTracker<HashedWheelTimer> leak;
100 private final Worker worker = new Worker();
101 private final Thread workerThread;
102
103 public static final int WORKER_STATE_INIT = 0;
104 public static final int WORKER_STATE_STARTED = 1;
105 public static final int WORKER_STATE_SHUTDOWN = 2;
106 @SuppressWarnings({"unused", "FieldMayBeFinal"})
107 private volatile int workerState;
108
109 private final long tickDuration;
110 private final HashedWheelBucket[] wheel;
111 private final int mask;
112 private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
113 private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
114 private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
115 private final AtomicLong pendingTimeouts = new AtomicLong(0);
116 private final long maxPendingTimeouts;
117 private final Executor taskExecutor;
118
119 private volatile long startTime;
120
121
122
123
124
125
126 public HashedWheelTimer() {
127 this(Executors.defaultThreadFactory());
128 }
129
130
131
132
133
134
135
136
137
138
139
140 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
141 this(Executors.defaultThreadFactory(), tickDuration, unit);
142 }
143
144
145
146
147
148
149
150
151
152
153
154 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
155 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
156 }
157
158
159
160
161
162
163
164
165
166
167 public HashedWheelTimer(ThreadFactory threadFactory) {
168 this(threadFactory, 100, TimeUnit.MILLISECONDS);
169 }
170
171
172
173
174
175
176
177
178
179
180
181
182 public HashedWheelTimer(
183 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
184 this(threadFactory, tickDuration, unit, 512);
185 }
186
187
188
189
190
191
192
193
194
195
196
197
198
199 public HashedWheelTimer(
200 ThreadFactory threadFactory,
201 long tickDuration, TimeUnit unit, int ticksPerWheel) {
202 this(threadFactory, tickDuration, unit, ticksPerWheel, true);
203 }
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220 public HashedWheelTimer(
221 ThreadFactory threadFactory,
222 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
223 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
224 }
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246 public HashedWheelTimer(
247 ThreadFactory threadFactory,
248 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
249 long maxPendingTimeouts) {
250 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
251 maxPendingTimeouts, ImmediateExecutor.INSTANCE);
252 }
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276 public HashedWheelTimer(
277 ThreadFactory threadFactory,
278 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
279 long maxPendingTimeouts, Executor taskExecutor) {
280
281 requireNonNull(threadFactory, "threadFactory");
282 requireNonNull(unit, "unit");
283 checkPositive(tickDuration, "tickDuration");
284 checkPositive(ticksPerWheel, "ticksPerWheel");
285 this.taskExecutor = requireNonNull(taskExecutor, "taskExecutor");
286
287
288 wheel = createWheel(ticksPerWheel);
289 mask = wheel.length - 1;
290
291
292 long duration = unit.toNanos(tickDuration);
293
294
295 if (duration >= Long.MAX_VALUE / wheel.length) {
296 throw new IllegalArgumentException(String.format(
297 "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
298 tickDuration, Long.MAX_VALUE / wheel.length));
299 }
300
301 if (duration < MILLISECOND_NANOS) {
302 logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
303 tickDuration, MILLISECOND_NANOS);
304 this.tickDuration = MILLISECOND_NANOS;
305 } else {
306 this.tickDuration = duration;
307 }
308
309 workerThread = threadFactory.newThread(worker);
310
311 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
312
313 this.maxPendingTimeouts = maxPendingTimeouts;
314
315 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
316 WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
317 reportTooManyInstances();
318 }
319 }
320
321 @Override
322 protected void finalize() throws Throwable {
323 try {
324 super.finalize();
325 } finally {
326
327
328 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
329 INSTANCE_COUNTER.decrementAndGet();
330 }
331 }
332 }
333
334 private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
335
336 checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
337
338 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
339 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
340 for (int i = 0; i < wheel.length; i ++) {
341 wheel[i] = new HashedWheelBucket();
342 }
343 return wheel;
344 }
345
346 private static int normalizeTicksPerWheel(int ticksPerWheel) {
347 int normalizedTicksPerWheel = 1;
348 while (normalizedTicksPerWheel < ticksPerWheel) {
349 normalizedTicksPerWheel <<= 1;
350 }
351 return normalizedTicksPerWheel;
352 }
353
354
355
356
357
358
359
360
361 public void start() {
362 switch (WORKER_STATE_UPDATER.get(this)) {
363 case WORKER_STATE_INIT:
364 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
365 workerThread.start();
366 }
367 break;
368 case WORKER_STATE_STARTED:
369 break;
370 case WORKER_STATE_SHUTDOWN:
371 throw new IllegalStateException("cannot be started once stopped");
372 default:
373 throw new Error("Invalid WorkerState");
374 }
375
376
377 while (startTime == 0) {
378 try {
379 startTimeInitialized.await();
380 } catch (InterruptedException ignore) {
381
382 }
383 }
384 }
385
386 @Override
387 public Set<Timeout> stop() {
388 if (Thread.currentThread() == workerThread) {
389 throw new IllegalStateException(
390 HashedWheelTimer.class.getSimpleName() +
391 ".stop() cannot be called from " +
392 TimerTask.class.getSimpleName());
393 }
394
395 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
396
397 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
398 INSTANCE_COUNTER.decrementAndGet();
399 if (leak != null) {
400 boolean closed = leak.close(this);
401 assert closed;
402 }
403 }
404
405 return Collections.emptySet();
406 }
407
408 try {
409 boolean interrupted = false;
410 while (workerThread.isAlive()) {
411 workerThread.interrupt();
412 try {
413 workerThread.join(100);
414 } catch (InterruptedException ignored) {
415 interrupted = true;
416 }
417 }
418
419 if (interrupted) {
420 Thread.currentThread().interrupt();
421 }
422 } finally {
423 INSTANCE_COUNTER.decrementAndGet();
424 if (leak != null) {
425 boolean closed = leak.close(this);
426 assert closed;
427 }
428 }
429 return worker.unprocessedTimeouts();
430 }
431
432 @Override
433 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
434 requireNonNull(task, "task");
435 requireNonNull(unit, "unit");
436
437 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
438
439 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
440 pendingTimeouts.decrementAndGet();
441 throw new RejectedExecutionException("Number of pending timeouts ("
442 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
443 + "timeouts (" + maxPendingTimeouts + ")");
444 }
445
446 start();
447
448
449
450 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
451
452
453 if (delay > 0 && deadline < 0) {
454 deadline = Long.MAX_VALUE;
455 }
456 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
457 timeouts.add(timeout);
458 return timeout;
459 }
460
461
462
463
464 public long pendingTimeouts() {
465 return pendingTimeouts.get();
466 }
467
468 private static void reportTooManyInstances() {
469 if (logger.isErrorEnabled()) {
470 String resourceType = simpleClassName(HashedWheelTimer.class);
471 logger.error("You are creating too many " + resourceType + " instances. " +
472 resourceType + " is a shared resource that must be reused across the JVM, " +
473 "so that only a few instances are created.");
474 }
475 }
476
477 private final class Worker implements Runnable {
478 private final Set<Timeout> unprocessedTimeouts = new HashSet<>();
479
480 private long tick;
481
482 @Override
483 public void run() {
484
485 startTime = System.nanoTime();
486 if (startTime == 0) {
487
488 startTime = 1;
489 }
490
491
492 startTimeInitialized.countDown();
493
494 do {
495 final long deadline = waitForNextTick();
496 if (deadline > 0) {
497 int idx = (int) (tick & mask);
498 processCancelledTasks();
499 HashedWheelBucket bucket =
500 wheel[idx];
501 transferTimeoutsToBuckets();
502 bucket.expireTimeouts(deadline);
503 tick++;
504 }
505 } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
506
507
508 for (HashedWheelBucket bucket: wheel) {
509 bucket.clearTimeouts(unprocessedTimeouts);
510 }
511 for (;;) {
512 HashedWheelTimeout timeout = timeouts.poll();
513 if (timeout == null) {
514 break;
515 }
516 if (!timeout.isCancelled()) {
517 unprocessedTimeouts.add(timeout);
518 }
519 }
520 processCancelledTasks();
521 }
522
523 private void transferTimeoutsToBuckets() {
524
525
526 for (int i = 0; i < 100000; i++) {
527 HashedWheelTimeout timeout = timeouts.poll();
528 if (timeout == null) {
529
530 break;
531 }
532 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
533
534 continue;
535 }
536
537 long calculated = timeout.deadline / tickDuration;
538 timeout.remainingRounds = (calculated - tick) / wheel.length;
539
540 final long ticks = Math.max(calculated, tick);
541 int stopIndex = (int) (ticks & mask);
542
543 HashedWheelBucket bucket = wheel[stopIndex];
544 bucket.addTimeout(timeout);
545 }
546 }
547
548 private void processCancelledTasks() {
549 for (;;) {
550 HashedWheelTimeout timeout = cancelledTimeouts.poll();
551 if (timeout == null) {
552
553 break;
554 }
555 try {
556 timeout.remove();
557 } catch (Throwable t) {
558 if (logger.isWarnEnabled()) {
559 logger.warn("An exception was thrown while process a cancellation task", t);
560 }
561 }
562 }
563 }
564
565
566
567
568
569
570
571 private long waitForNextTick() {
572 long deadline = tickDuration * (tick + 1);
573
574 for (;;) {
575 final long currentTime = System.nanoTime() - startTime;
576 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
577
578 if (sleepTimeMs <= 0) {
579 if (currentTime == Long.MIN_VALUE) {
580 return -Long.MAX_VALUE;
581 } else {
582 return currentTime;
583 }
584 }
585
586
587
588
589
590
591 if (PlatformDependent.isWindows()) {
592 sleepTimeMs = sleepTimeMs / 10 * 10;
593 if (sleepTimeMs == 0) {
594 sleepTimeMs = 1;
595 }
596 }
597
598 try {
599 Thread.sleep(sleepTimeMs);
600 } catch (InterruptedException ignored) {
601 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
602 return Long.MIN_VALUE;
603 }
604 }
605 }
606 }
607
608 public Set<Timeout> unprocessedTimeouts() {
609 return Collections.unmodifiableSet(unprocessedTimeouts);
610 }
611 }
612
613 private static final class HashedWheelTimeout implements Timeout, Runnable {
614
615 private static final int ST_INIT = 0;
616 private static final int ST_CANCELLED = 1;
617 private static final int ST_EXPIRED = 2;
618 private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
619 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
620
621 private final HashedWheelTimer timer;
622 private final TimerTask task;
623 private final long deadline;
624
625 @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
626 private volatile int state = ST_INIT;
627
628
629
630 long remainingRounds;
631
632
633
634 HashedWheelTimeout next;
635 HashedWheelTimeout prev;
636
637
638 HashedWheelBucket bucket;
639
640 HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
641 this.timer = timer;
642 this.task = task;
643 this.deadline = deadline;
644 }
645
646 @Override
647 public Timer timer() {
648 return timer;
649 }
650
651 @Override
652 public TimerTask task() {
653 return task;
654 }
655
656 @Override
657 public boolean cancel() {
658
659 if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
660 return false;
661 }
662
663
664
665 timer.cancelledTimeouts.add(this);
666 return true;
667 }
668
669 void remove() {
670 HashedWheelBucket bucket = this.bucket;
671 if (bucket != null) {
672 bucket.remove(this);
673 } else {
674 timer.pendingTimeouts.decrementAndGet();
675 }
676 }
677
678 public boolean compareAndSetState(int expected, int state) {
679 return STATE_UPDATER.compareAndSet(this, expected, state);
680 }
681
682 public int state() {
683 return state;
684 }
685
686 @Override
687 public boolean isCancelled() {
688 return state() == ST_CANCELLED;
689 }
690
691 @Override
692 public boolean isExpired() {
693 return state() == ST_EXPIRED;
694 }
695
696 public void expire() {
697 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
698 return;
699 }
700
701 try {
702 timer.taskExecutor.execute(this);
703 } catch (Throwable t) {
704 if (logger.isWarnEnabled()) {
705 logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
706 + " for execution.", t);
707 }
708 }
709 }
710
711 @Override
712 public void run() {
713 try {
714 task.run(this);
715 } catch (Throwable t) {
716 if (logger.isWarnEnabled()) {
717 logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
718 }
719 }
720 }
721
722 @Override
723 public String toString() {
724 final long currentTime = System.nanoTime();
725 long remaining = deadline - currentTime + timer.startTime;
726
727 StringBuilder buf = new StringBuilder(192)
728 .append(simpleClassName(this))
729 .append('(')
730 .append("deadline: ");
731 if (remaining > 0) {
732 buf.append(remaining)
733 .append(" ns later");
734 } else if (remaining < 0) {
735 buf.append(-remaining)
736 .append(" ns ago");
737 } else {
738 buf.append("now");
739 }
740
741 if (isCancelled()) {
742 buf.append(", cancelled");
743 }
744
745 return buf.append(", task: ")
746 .append(task())
747 .append(')')
748 .toString();
749 }
750 }
751
752
753
754
755
756
757 private static final class HashedWheelBucket {
758
759 private HashedWheelTimeout head;
760 private HashedWheelTimeout tail;
761
762
763
764
765 public void addTimeout(HashedWheelTimeout timeout) {
766 assert timeout.bucket == null;
767 timeout.bucket = this;
768 if (head == null) {
769 head = tail = timeout;
770 } else {
771 tail.next = timeout;
772 timeout.prev = tail;
773 tail = timeout;
774 }
775 }
776
777
778
779
780 public void expireTimeouts(long deadline) {
781 HashedWheelTimeout timeout = head;
782
783
784 while (timeout != null) {
785 HashedWheelTimeout next = timeout.next;
786 if (timeout.remainingRounds <= 0) {
787 next = remove(timeout);
788 if (timeout.deadline <= deadline) {
789 timeout.expire();
790 } else {
791
792 throw new IllegalStateException(String.format(
793 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
794 }
795 } else if (timeout.isCancelled()) {
796 next = remove(timeout);
797 } else {
798 timeout.remainingRounds --;
799 }
800 timeout = next;
801 }
802 }
803
804 public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
805 HashedWheelTimeout next = timeout.next;
806
807 if (timeout.prev != null) {
808 timeout.prev.next = next;
809 }
810 if (timeout.next != null) {
811 timeout.next.prev = timeout.prev;
812 }
813
814 if (timeout == head) {
815
816 if (timeout == tail) {
817 tail = null;
818 head = null;
819 } else {
820 head = next;
821 }
822 } else if (timeout == tail) {
823
824 tail = timeout.prev;
825 }
826
827 timeout.prev = null;
828 timeout.next = null;
829 timeout.bucket = null;
830 timeout.timer.pendingTimeouts.decrementAndGet();
831 return next;
832 }
833
834
835
836
837 public void clearTimeouts(Set<Timeout> set) {
838 for (;;) {
839 HashedWheelTimeout timeout = pollTimeout();
840 if (timeout == null) {
841 return;
842 }
843 if (timeout.isExpired() || timeout.isCancelled()) {
844 continue;
845 }
846 set.add(timeout);
847 }
848 }
849
850 private HashedWheelTimeout pollTimeout() {
851 HashedWheelTimeout head = this.head;
852 if (head == null) {
853 return null;
854 }
855 HashedWheelTimeout next = head.next;
856 if (next == null) {
857 tail = this.head = null;
858 } else {
859 this.head = next;
860 next.prev = null;
861 }
862
863
864 head.next = null;
865 head.prev = null;
866 head.bucket = null;
867 return head;
868 }
869 }
870 }