1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util;
17
18 import static io.netty.util.internal.ObjectUtil.checkPositive;
19 import static io.netty.util.internal.ObjectUtil.checkNotNull;
20
21 import io.netty.util.concurrent.ImmediateExecutor;
22 import io.netty.util.internal.MathUtil;
23 import io.netty.util.internal.PlatformDependent;
24 import io.netty.util.internal.logging.InternalLogger;
25 import io.netty.util.internal.logging.InternalLoggerFactory;
26
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.RejectedExecutionException;
35 import java.util.concurrent.ThreadFactory;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
40 import java.util.concurrent.atomic.AtomicLong;
41
42 import static io.netty.util.internal.StringUtil.simpleClassName;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 public class HashedWheelTimer implements Timer {
86
87 static final InternalLogger logger =
88 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
89
90 private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
91 private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
92 private static final int INSTANCE_COUNT_LIMIT = 64;
93 private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
94 private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
95 .newResourceLeakDetector(HashedWheelTimer.class, 1);
96
97 private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
98 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
99
100 private final ResourceLeakTracker<HashedWheelTimer> leak;
101 private final Worker worker = new Worker();
102 private final Thread workerThread;
103
104 public static final int WORKER_STATE_INIT = 0;
105 public static final int WORKER_STATE_STARTED = 1;
106 public static final int WORKER_STATE_SHUTDOWN = 2;
107 @SuppressWarnings({"unused", "FieldMayBeFinal"})
108 private volatile int workerState;
109
110 private final long tickDuration;
111 private final HashedWheelBucket[] wheel;
112 private final int mask;
113 private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
114 private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
115 private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
116 private final AtomicLong pendingTimeouts = new AtomicLong(0);
117 private final long maxPendingTimeouts;
118 private final Executor taskExecutor;
119
120 private volatile long startTime;
121
122
123
124
125
126
127 public HashedWheelTimer() {
128 this(Executors.defaultThreadFactory());
129 }
130
131
132
133
134
135
136
137
138
139
140
141 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
142 this(Executors.defaultThreadFactory(), tickDuration, unit);
143 }
144
145
146
147
148
149
150
151
152
153
154
155 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
156 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
157 }
158
159
160
161
162
163
164
165
166
167
168 public HashedWheelTimer(ThreadFactory threadFactory) {
169 this(threadFactory, 100, TimeUnit.MILLISECONDS);
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183 public HashedWheelTimer(
184 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
185 this(threadFactory, tickDuration, unit, 512);
186 }
187
188
189
190
191
192
193
194
195
196
197
198
199
200 public HashedWheelTimer(
201 ThreadFactory threadFactory,
202 long tickDuration, TimeUnit unit, int ticksPerWheel) {
203 this(threadFactory, tickDuration, unit, ticksPerWheel, true);
204 }
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221 public HashedWheelTimer(
222 ThreadFactory threadFactory,
223 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
224 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
225 }
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247 public HashedWheelTimer(
248 ThreadFactory threadFactory,
249 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
250 long maxPendingTimeouts) {
251 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
252 maxPendingTimeouts, ImmediateExecutor.INSTANCE);
253 }
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277 public HashedWheelTimer(
278 ThreadFactory threadFactory,
279 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
280 long maxPendingTimeouts, Executor taskExecutor) {
281
282 checkNotNull(threadFactory, "threadFactory");
283 checkNotNull(unit, "unit");
284 checkPositive(tickDuration, "tickDuration");
285 checkPositive(ticksPerWheel, "ticksPerWheel");
286 this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
287
288
289 wheel = createWheel(ticksPerWheel);
290 mask = wheel.length - 1;
291
292
293 long duration = unit.toNanos(tickDuration);
294
295
296 if (duration >= Long.MAX_VALUE / wheel.length) {
297 throw new IllegalArgumentException(String.format(
298 "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
299 tickDuration, Long.MAX_VALUE / wheel.length));
300 }
301
302 if (duration < MILLISECOND_NANOS) {
303 logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
304 tickDuration, MILLISECOND_NANOS);
305 this.tickDuration = MILLISECOND_NANOS;
306 } else {
307 this.tickDuration = duration;
308 }
309
310 workerThread = threadFactory.newThread(worker);
311
312 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
313
314 this.maxPendingTimeouts = maxPendingTimeouts;
315
316 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
317 WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
318 reportTooManyInstances();
319 }
320 }
321
322 @Override
323 protected void finalize() throws Throwable {
324 try {
325 super.finalize();
326 } finally {
327
328
329 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
330 INSTANCE_COUNTER.decrementAndGet();
331 }
332 }
333 }
334
335 private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
336 ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);
337
338 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
339 for (int i = 0; i < wheel.length; i ++) {
340 wheel[i] = new HashedWheelBucket();
341 }
342 return wheel;
343 }
344
345
346
347
348
349
350
351
352 public void start() {
353 switch (WORKER_STATE_UPDATER.get(this)) {
354 case WORKER_STATE_INIT:
355 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
356 workerThread.start();
357 }
358 break;
359 case WORKER_STATE_STARTED:
360 break;
361 case WORKER_STATE_SHUTDOWN:
362 throw new IllegalStateException("cannot be started once stopped");
363 default:
364 throw new Error("Invalid WorkerState");
365 }
366
367
368 while (startTime == 0) {
369 try {
370 startTimeInitialized.await();
371 } catch (InterruptedException ignore) {
372
373 }
374 }
375 }
376
377 @Override
378 public Set<Timeout> stop() {
379 if (Thread.currentThread() == workerThread) {
380 throw new IllegalStateException(
381 HashedWheelTimer.class.getSimpleName() +
382 ".stop() cannot be called from " +
383 TimerTask.class.getSimpleName());
384 }
385
386 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
387
388 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
389 INSTANCE_COUNTER.decrementAndGet();
390 if (leak != null) {
391 boolean closed = leak.close(this);
392 assert closed;
393 }
394 }
395
396 return Collections.emptySet();
397 }
398
399 try {
400 boolean interrupted = false;
401 while (workerThread.isAlive()) {
402 workerThread.interrupt();
403 try {
404 workerThread.join(100);
405 } catch (InterruptedException ignored) {
406 interrupted = true;
407 }
408 }
409
410 if (interrupted) {
411 Thread.currentThread().interrupt();
412 }
413 } finally {
414 INSTANCE_COUNTER.decrementAndGet();
415 if (leak != null) {
416 boolean closed = leak.close(this);
417 assert closed;
418 }
419 }
420 Set<Timeout> unprocessed = worker.unprocessedTimeouts();
421 Set<Timeout> cancelled = new HashSet<Timeout>(unprocessed.size());
422 for (Timeout timeout : unprocessed) {
423 if (timeout.cancel()) {
424 cancelled.add(timeout);
425 }
426 }
427 return cancelled;
428 }
429
430 @Override
431 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
432 checkNotNull(task, "task");
433 checkNotNull(unit, "unit");
434
435 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
436
437 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
438 pendingTimeouts.decrementAndGet();
439 throw new RejectedExecutionException("Number of pending timeouts ("
440 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
441 + "timeouts (" + maxPendingTimeouts + ")");
442 }
443
444 start();
445
446
447
448 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
449
450
451 if (delay > 0 && deadline < 0) {
452 deadline = Long.MAX_VALUE;
453 }
454 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
455 timeouts.add(timeout);
456 return timeout;
457 }
458
459
460
461
462 public long pendingTimeouts() {
463 return pendingTimeouts.get();
464 }
465
466 private static void reportTooManyInstances() {
467 if (logger.isErrorEnabled()) {
468 String resourceType = simpleClassName(HashedWheelTimer.class);
469 logger.error("You are creating too many " + resourceType + " instances. " +
470 resourceType + " is a shared resource that must be reused across the JVM, " +
471 "so that only a few instances are created.");
472 }
473 }
474
475 private final class Worker implements Runnable {
476 private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
477
478 private long tick;
479
480 @Override
481 public void run() {
482
483 startTime = System.nanoTime();
484 if (startTime == 0) {
485
486 startTime = 1;
487 }
488
489
490 startTimeInitialized.countDown();
491
492 do {
493 final long deadline = waitForNextTick();
494 if (deadline > 0) {
495 int idx = (int) (tick & mask);
496 processCancelledTasks();
497 HashedWheelBucket bucket =
498 wheel[idx];
499 transferTimeoutsToBuckets();
500 bucket.expireTimeouts(deadline);
501 tick++;
502 }
503 } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
504
505
506 for (HashedWheelBucket bucket: wheel) {
507 bucket.clearTimeouts(unprocessedTimeouts);
508 }
509 for (;;) {
510 HashedWheelTimeout timeout = timeouts.poll();
511 if (timeout == null) {
512 break;
513 }
514 if (!timeout.isCancelled()) {
515 unprocessedTimeouts.add(timeout);
516 }
517 }
518 processCancelledTasks();
519 }
520
521 private void transferTimeoutsToBuckets() {
522
523
524 for (int i = 0; i < 100000; i++) {
525 HashedWheelTimeout timeout = timeouts.poll();
526 if (timeout == null) {
527
528 break;
529 }
530 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
531
532 continue;
533 }
534
535 long calculated = timeout.deadline / tickDuration;
536 timeout.remainingRounds = (calculated - tick) / wheel.length;
537
538 final long ticks = Math.max(calculated, tick);
539 int stopIndex = (int) (ticks & mask);
540
541 HashedWheelBucket bucket = wheel[stopIndex];
542 bucket.addTimeout(timeout);
543 }
544 }
545
546 private void processCancelledTasks() {
547 for (;;) {
548 HashedWheelTimeout timeout = cancelledTimeouts.poll();
549 if (timeout == null) {
550
551 break;
552 }
553 try {
554 timeout.removeAfterCancellation();
555 } catch (Throwable t) {
556 if (logger.isWarnEnabled()) {
557 logger.warn("An exception was thrown while process a cancellation task", t);
558 }
559 }
560 }
561 }
562
563
564
565
566
567
568
569 private long waitForNextTick() {
570 long deadline = tickDuration * (tick + 1);
571
572 for (;;) {
573 final long currentTime = System.nanoTime() - startTime;
574 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
575
576 if (sleepTimeMs <= 0) {
577 if (currentTime == Long.MIN_VALUE) {
578 return -Long.MAX_VALUE;
579 } else {
580 return currentTime;
581 }
582 }
583
584
585
586
587
588
589 if (PlatformDependent.isWindows()) {
590 sleepTimeMs = sleepTimeMs / 10 * 10;
591 if (sleepTimeMs == 0) {
592 sleepTimeMs = 1;
593 }
594 }
595
596 try {
597 Thread.sleep(sleepTimeMs);
598 } catch (InterruptedException ignored) {
599 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
600 return Long.MIN_VALUE;
601 }
602 }
603 }
604 }
605
606 public Set<Timeout> unprocessedTimeouts() {
607 return Collections.unmodifiableSet(unprocessedTimeouts);
608 }
609 }
610
611 private static final class HashedWheelTimeout implements Timeout, Runnable {
612
613 private static final int ST_INIT = 0;
614 private static final int ST_CANCELLED = 1;
615 private static final int ST_EXPIRED = 2;
616 private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
617 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
618
619 private final HashedWheelTimer timer;
620 private final TimerTask task;
621 private final long deadline;
622
623 @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
624 private volatile int state = ST_INIT;
625
626
627
628 long remainingRounds;
629
630
631
632 HashedWheelTimeout next;
633 HashedWheelTimeout prev;
634
635
636 HashedWheelBucket bucket;
637
638 HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
639 this.timer = timer;
640 this.task = task;
641 this.deadline = deadline;
642 }
643
644 @Override
645 public Timer timer() {
646 return timer;
647 }
648
649 @Override
650 public TimerTask task() {
651 return task;
652 }
653
654 @Override
655 public boolean cancel() {
656
657 if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
658 return false;
659 }
660
661
662
663 timer.cancelledTimeouts.add(this);
664 return true;
665 }
666
667 private void remove() {
668 HashedWheelBucket bucket = this.bucket;
669 if (bucket != null) {
670 bucket.remove(this);
671 }
672 timer.pendingTimeouts.decrementAndGet();
673 }
674 void removeAfterCancellation() {
675 remove();
676 task.cancelled(this);
677 }
678
679 public boolean compareAndSetState(int expected, int state) {
680 return STATE_UPDATER.compareAndSet(this, expected, state);
681 }
682
683 public int state() {
684 return state;
685 }
686
687 @Override
688 public boolean isCancelled() {
689 return state() == ST_CANCELLED;
690 }
691
692 @Override
693 public boolean isExpired() {
694 return state() == ST_EXPIRED;
695 }
696
697 public void expire() {
698 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
699 return;
700 }
701
702 try {
703 remove();
704 timer.taskExecutor.execute(this);
705 } catch (Throwable t) {
706 if (logger.isWarnEnabled()) {
707 logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
708 + " for execution.", t);
709 }
710 }
711 }
712
713 @Override
714 public void run() {
715 try {
716 task.run(this);
717 } catch (Throwable t) {
718 if (logger.isWarnEnabled()) {
719 logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
720 }
721 }
722 }
723
724 @Override
725 public String toString() {
726 final long currentTime = System.nanoTime();
727 long remaining = deadline - currentTime + timer.startTime;
728
729 StringBuilder buf = new StringBuilder(192)
730 .append(simpleClassName(this))
731 .append('(')
732 .append("deadline: ");
733 if (remaining > 0) {
734 buf.append(remaining)
735 .append(" ns later");
736 } else if (remaining < 0) {
737 buf.append(-remaining)
738 .append(" ns ago");
739 } else {
740 buf.append("now");
741 }
742
743 if (isCancelled()) {
744 buf.append(", cancelled");
745 }
746
747 return buf.append(", task: ")
748 .append(task())
749 .append(')')
750 .toString();
751 }
752 }
753
754
755
756
757
758
759 private static final class HashedWheelBucket {
760
761 private HashedWheelTimeout head;
762 private HashedWheelTimeout tail;
763
764
765
766
767 public void addTimeout(HashedWheelTimeout timeout) {
768 assert timeout.bucket == null;
769 timeout.bucket = this;
770 if (head == null) {
771 head = tail = timeout;
772 } else {
773 tail.next = timeout;
774 timeout.prev = tail;
775 tail = timeout;
776 }
777 }
778
779
780
781
782 public void expireTimeouts(long deadline) {
783 HashedWheelTimeout timeout = head;
784
785
786 while (timeout != null) {
787 HashedWheelTimeout next = timeout.next;
788 if (timeout.remainingRounds <= 0) {
789 if (timeout.deadline <= deadline) {
790 timeout.expire();
791 } else {
792
793 throw new IllegalStateException(String.format(
794 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
795 }
796 } else if (!timeout.isCancelled()) {
797 timeout.remainingRounds --;
798 }
799 timeout = next;
800 }
801 }
802
803 public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
804 HashedWheelTimeout next = timeout.next;
805
806 if (timeout.prev != null) {
807 timeout.prev.next = next;
808 }
809 if (timeout.next != null) {
810 timeout.next.prev = timeout.prev;
811 }
812
813 if (timeout == head) {
814
815 if (timeout == tail) {
816 tail = null;
817 head = null;
818 } else {
819 head = next;
820 }
821 } else if (timeout == tail) {
822
823 tail = timeout.prev;
824 }
825
826 timeout.prev = null;
827 timeout.next = null;
828 timeout.bucket = null;
829 return next;
830 }
831
832
833
834
835 public void clearTimeouts(Set<Timeout> set) {
836 for (;;) {
837 HashedWheelTimeout timeout = pollTimeout();
838 if (timeout == null) {
839 return;
840 }
841 if (timeout.isExpired() || timeout.isCancelled()) {
842 continue;
843 }
844 set.add(timeout);
845 }
846 }
847
848 private HashedWheelTimeout pollTimeout() {
849 HashedWheelTimeout head = this.head;
850 if (head == null) {
851 return null;
852 }
853 HashedWheelTimeout next = head.next;
854 if (next == null) {
855 tail = this.head = null;
856 } else {
857 this.head = next;
858 next.prev = null;
859 }
860
861
862 head.next = null;
863 head.prev = null;
864 head.bucket = null;
865 return head;
866 }
867 }
868 }