1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util;
17
18 import static io.netty.util.internal.ObjectUtil.checkInRange;
19 import static io.netty.util.internal.ObjectUtil.checkPositive;
20 import static io.netty.util.internal.ObjectUtil.checkNotNull;
21
22 import io.netty.util.concurrent.ImmediateExecutor;
23 import io.netty.util.internal.PlatformDependent;
24 import io.netty.util.internal.logging.InternalLogger;
25 import io.netty.util.internal.logging.InternalLoggerFactory;
26
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.RejectedExecutionException;
35 import java.util.concurrent.ThreadFactory;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
40 import java.util.concurrent.atomic.AtomicLong;
41
42 import static io.netty.util.internal.StringUtil.simpleClassName;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 public class HashedWheelTimer implements Timer {
86
87 static final InternalLogger logger =
88 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
89
90 private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
91 private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
92 private static final int INSTANCE_COUNT_LIMIT = 64;
93 private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
94 private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
95 .newResourceLeakDetector(HashedWheelTimer.class, 1);
96
97 private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
98 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
99
100 private final ResourceLeakTracker<HashedWheelTimer> leak;
101 private final Worker worker = new Worker();
102 private final Thread workerThread;
103
104 public static final int WORKER_STATE_INIT = 0;
105 public static final int WORKER_STATE_STARTED = 1;
106 public static final int WORKER_STATE_SHUTDOWN = 2;
107 @SuppressWarnings({"unused", "FieldMayBeFinal"})
108 private volatile int workerState;
109
110 private final long tickDuration;
111 private final HashedWheelBucket[] wheel;
112 private final int mask;
113 private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
114 private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
115 private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
116 private final AtomicLong pendingTimeouts = new AtomicLong(0);
117 private final long maxPendingTimeouts;
118 private final Executor taskExecutor;
119
120 private volatile long startTime;
121
122
123
124
125
126
127 public HashedWheelTimer() {
128 this(Executors.defaultThreadFactory());
129 }
130
131
132
133
134
135
136
137
138
139
140
141 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
142 this(Executors.defaultThreadFactory(), tickDuration, unit);
143 }
144
145
146
147
148
149
150
151
152
153
154
155 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
156 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
157 }
158
159
160
161
162
163
164
165
166
167
168 public HashedWheelTimer(ThreadFactory threadFactory) {
169 this(threadFactory, 100, TimeUnit.MILLISECONDS);
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183 public HashedWheelTimer(
184 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
185 this(threadFactory, tickDuration, unit, 512);
186 }
187
188
189
190
191
192
193
194
195
196
197
198
199
200 public HashedWheelTimer(
201 ThreadFactory threadFactory,
202 long tickDuration, TimeUnit unit, int ticksPerWheel) {
203 this(threadFactory, tickDuration, unit, ticksPerWheel, true);
204 }
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221 public HashedWheelTimer(
222 ThreadFactory threadFactory,
223 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
224 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
225 }
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247 public HashedWheelTimer(
248 ThreadFactory threadFactory,
249 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
250 long maxPendingTimeouts) {
251 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
252 maxPendingTimeouts, ImmediateExecutor.INSTANCE);
253 }
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277 public HashedWheelTimer(
278 ThreadFactory threadFactory,
279 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
280 long maxPendingTimeouts, Executor taskExecutor) {
281
282 checkNotNull(threadFactory, "threadFactory");
283 checkNotNull(unit, "unit");
284 checkPositive(tickDuration, "tickDuration");
285 checkPositive(ticksPerWheel, "ticksPerWheel");
286 this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
287
288
289 wheel = createWheel(ticksPerWheel);
290 mask = wheel.length - 1;
291
292
293 long duration = unit.toNanos(tickDuration);
294
295
296 if (duration >= Long.MAX_VALUE / wheel.length) {
297 throw new IllegalArgumentException(String.format(
298 "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
299 tickDuration, Long.MAX_VALUE / wheel.length));
300 }
301
302 if (duration < MILLISECOND_NANOS) {
303 logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
304 tickDuration, MILLISECOND_NANOS);
305 this.tickDuration = MILLISECOND_NANOS;
306 } else {
307 this.tickDuration = duration;
308 }
309
310 workerThread = threadFactory.newThread(worker);
311
312 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
313
314 this.maxPendingTimeouts = maxPendingTimeouts;
315
316 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
317 WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
318 reportTooManyInstances();
319 }
320 }
321
322 @Override
323 protected void finalize() throws Throwable {
324 try {
325 super.finalize();
326 } finally {
327
328
329 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
330 INSTANCE_COUNTER.decrementAndGet();
331 }
332 }
333 }
334
335 private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
336
337 checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
338
339 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
340 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
341 for (int i = 0; i < wheel.length; i ++) {
342 wheel[i] = new HashedWheelBucket();
343 }
344 return wheel;
345 }
346
347 private static int normalizeTicksPerWheel(int ticksPerWheel) {
348 int normalizedTicksPerWheel = 1;
349 while (normalizedTicksPerWheel < ticksPerWheel) {
350 normalizedTicksPerWheel <<= 1;
351 }
352 return normalizedTicksPerWheel;
353 }
354
355
356
357
358
359
360
361
362 public void start() {
363 switch (WORKER_STATE_UPDATER.get(this)) {
364 case WORKER_STATE_INIT:
365 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
366 workerThread.start();
367 }
368 break;
369 case WORKER_STATE_STARTED:
370 break;
371 case WORKER_STATE_SHUTDOWN:
372 throw new IllegalStateException("cannot be started once stopped");
373 default:
374 throw new Error("Invalid WorkerState");
375 }
376
377
378 while (startTime == 0) {
379 try {
380 startTimeInitialized.await();
381 } catch (InterruptedException ignore) {
382
383 }
384 }
385 }
386
387 @Override
388 public Set<Timeout> stop() {
389 if (Thread.currentThread() == workerThread) {
390 throw new IllegalStateException(
391 HashedWheelTimer.class.getSimpleName() +
392 ".stop() cannot be called from " +
393 TimerTask.class.getSimpleName());
394 }
395
396 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
397
398 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
399 INSTANCE_COUNTER.decrementAndGet();
400 if (leak != null) {
401 boolean closed = leak.close(this);
402 assert closed;
403 }
404 }
405
406 return Collections.emptySet();
407 }
408
409 try {
410 boolean interrupted = false;
411 while (workerThread.isAlive()) {
412 workerThread.interrupt();
413 try {
414 workerThread.join(100);
415 } catch (InterruptedException ignored) {
416 interrupted = true;
417 }
418 }
419
420 if (interrupted) {
421 Thread.currentThread().interrupt();
422 }
423 } finally {
424 INSTANCE_COUNTER.decrementAndGet();
425 if (leak != null) {
426 boolean closed = leak.close(this);
427 assert closed;
428 }
429 }
430 return worker.unprocessedTimeouts();
431 }
432
433 @Override
434 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
435 checkNotNull(task, "task");
436 checkNotNull(unit, "unit");
437
438 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
439
440 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
441 pendingTimeouts.decrementAndGet();
442 throw new RejectedExecutionException("Number of pending timeouts ("
443 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
444 + "timeouts (" + maxPendingTimeouts + ")");
445 }
446
447 start();
448
449
450
451 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
452
453
454 if (delay > 0 && deadline < 0) {
455 deadline = Long.MAX_VALUE;
456 }
457 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
458 timeouts.add(timeout);
459 return timeout;
460 }
461
462
463
464
465 public long pendingTimeouts() {
466 return pendingTimeouts.get();
467 }
468
469 private static void reportTooManyInstances() {
470 if (logger.isErrorEnabled()) {
471 String resourceType = simpleClassName(HashedWheelTimer.class);
472 logger.error("You are creating too many " + resourceType + " instances. " +
473 resourceType + " is a shared resource that must be reused across the JVM, " +
474 "so that only a few instances are created.");
475 }
476 }
477
478 private final class Worker implements Runnable {
479 private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
480
481 private long tick;
482
483 @Override
484 public void run() {
485
486 startTime = System.nanoTime();
487 if (startTime == 0) {
488
489 startTime = 1;
490 }
491
492
493 startTimeInitialized.countDown();
494
495 do {
496 final long deadline = waitForNextTick();
497 if (deadline > 0) {
498 int idx = (int) (tick & mask);
499 processCancelledTasks();
500 HashedWheelBucket bucket =
501 wheel[idx];
502 transferTimeoutsToBuckets();
503 bucket.expireTimeouts(deadline);
504 tick++;
505 }
506 } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
507
508
509 for (HashedWheelBucket bucket: wheel) {
510 bucket.clearTimeouts(unprocessedTimeouts);
511 }
512 for (;;) {
513 HashedWheelTimeout timeout = timeouts.poll();
514 if (timeout == null) {
515 break;
516 }
517 if (!timeout.isCancelled()) {
518 unprocessedTimeouts.add(timeout);
519 }
520 }
521 processCancelledTasks();
522 }
523
524 private void transferTimeoutsToBuckets() {
525
526
527 for (int i = 0; i < 100000; i++) {
528 HashedWheelTimeout timeout = timeouts.poll();
529 if (timeout == null) {
530
531 break;
532 }
533 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
534
535 continue;
536 }
537
538 long calculated = timeout.deadline / tickDuration;
539 timeout.remainingRounds = (calculated - tick) / wheel.length;
540
541 final long ticks = Math.max(calculated, tick);
542 int stopIndex = (int) (ticks & mask);
543
544 HashedWheelBucket bucket = wheel[stopIndex];
545 bucket.addTimeout(timeout);
546 }
547 }
548
549 private void processCancelledTasks() {
550 for (;;) {
551 HashedWheelTimeout timeout = cancelledTimeouts.poll();
552 if (timeout == null) {
553
554 break;
555 }
556 try {
557 timeout.remove();
558 } catch (Throwable t) {
559 if (logger.isWarnEnabled()) {
560 logger.warn("An exception was thrown while process a cancellation task", t);
561 }
562 }
563 }
564 }
565
566
567
568
569
570
571
572 private long waitForNextTick() {
573 long deadline = tickDuration * (tick + 1);
574
575 for (;;) {
576 final long currentTime = System.nanoTime() - startTime;
577 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
578
579 if (sleepTimeMs <= 0) {
580 if (currentTime == Long.MIN_VALUE) {
581 return -Long.MAX_VALUE;
582 } else {
583 return currentTime;
584 }
585 }
586
587
588
589
590
591
592 if (PlatformDependent.isWindows()) {
593 sleepTimeMs = sleepTimeMs / 10 * 10;
594 if (sleepTimeMs == 0) {
595 sleepTimeMs = 1;
596 }
597 }
598
599 try {
600 Thread.sleep(sleepTimeMs);
601 } catch (InterruptedException ignored) {
602 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
603 return Long.MIN_VALUE;
604 }
605 }
606 }
607 }
608
609 public Set<Timeout> unprocessedTimeouts() {
610 return Collections.unmodifiableSet(unprocessedTimeouts);
611 }
612 }
613
614 private static final class HashedWheelTimeout implements Timeout, Runnable {
615
616 private static final int ST_INIT = 0;
617 private static final int ST_CANCELLED = 1;
618 private static final int ST_EXPIRED = 2;
619 private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
620 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
621
622 private final HashedWheelTimer timer;
623 private final TimerTask task;
624 private final long deadline;
625
626 @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
627 private volatile int state = ST_INIT;
628
629
630
631 long remainingRounds;
632
633
634
635 HashedWheelTimeout next;
636 HashedWheelTimeout prev;
637
638
639 HashedWheelBucket bucket;
640
641 HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
642 this.timer = timer;
643 this.task = task;
644 this.deadline = deadline;
645 }
646
647 @Override
648 public Timer timer() {
649 return timer;
650 }
651
652 @Override
653 public TimerTask task() {
654 return task;
655 }
656
657 @Override
658 public boolean cancel() {
659
660 if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
661 return false;
662 }
663
664
665
666 timer.cancelledTimeouts.add(this);
667 return true;
668 }
669
670 void remove() {
671 HashedWheelBucket bucket = this.bucket;
672 if (bucket != null) {
673 bucket.remove(this);
674 } else {
675 timer.pendingTimeouts.decrementAndGet();
676 }
677 }
678
679 public boolean compareAndSetState(int expected, int state) {
680 return STATE_UPDATER.compareAndSet(this, expected, state);
681 }
682
683 public int state() {
684 return state;
685 }
686
687 @Override
688 public boolean isCancelled() {
689 return state() == ST_CANCELLED;
690 }
691
692 @Override
693 public boolean isExpired() {
694 return state() == ST_EXPIRED;
695 }
696
697 public void expire() {
698 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
699 return;
700 }
701
702 try {
703 timer.taskExecutor.execute(this);
704 } catch (Throwable t) {
705 if (logger.isWarnEnabled()) {
706 logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
707 + " for execution.", t);
708 }
709 }
710 }
711
712 @Override
713 public void run() {
714 try {
715 task.run(this);
716 } catch (Throwable t) {
717 if (logger.isWarnEnabled()) {
718 logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
719 }
720 }
721 }
722
723 @Override
724 public String toString() {
725 final long currentTime = System.nanoTime();
726 long remaining = deadline - currentTime + timer.startTime;
727
728 StringBuilder buf = new StringBuilder(192)
729 .append(simpleClassName(this))
730 .append('(')
731 .append("deadline: ");
732 if (remaining > 0) {
733 buf.append(remaining)
734 .append(" ns later");
735 } else if (remaining < 0) {
736 buf.append(-remaining)
737 .append(" ns ago");
738 } else {
739 buf.append("now");
740 }
741
742 if (isCancelled()) {
743 buf.append(", cancelled");
744 }
745
746 return buf.append(", task: ")
747 .append(task())
748 .append(')')
749 .toString();
750 }
751 }
752
753
754
755
756
757
758 private static final class HashedWheelBucket {
759
760 private HashedWheelTimeout head;
761 private HashedWheelTimeout tail;
762
763
764
765
766 public void addTimeout(HashedWheelTimeout timeout) {
767 assert timeout.bucket == null;
768 timeout.bucket = this;
769 if (head == null) {
770 head = tail = timeout;
771 } else {
772 tail.next = timeout;
773 timeout.prev = tail;
774 tail = timeout;
775 }
776 }
777
778
779
780
781 public void expireTimeouts(long deadline) {
782 HashedWheelTimeout timeout = head;
783
784
785 while (timeout != null) {
786 HashedWheelTimeout next = timeout.next;
787 if (timeout.remainingRounds <= 0) {
788 next = remove(timeout);
789 if (timeout.deadline <= deadline) {
790 timeout.expire();
791 } else {
792
793 throw new IllegalStateException(String.format(
794 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
795 }
796 } else if (timeout.isCancelled()) {
797 next = remove(timeout);
798 } else {
799 timeout.remainingRounds --;
800 }
801 timeout = next;
802 }
803 }
804
805 public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
806 HashedWheelTimeout next = timeout.next;
807
808 if (timeout.prev != null) {
809 timeout.prev.next = next;
810 }
811 if (timeout.next != null) {
812 timeout.next.prev = timeout.prev;
813 }
814
815 if (timeout == head) {
816
817 if (timeout == tail) {
818 tail = null;
819 head = null;
820 } else {
821 head = next;
822 }
823 } else if (timeout == tail) {
824
825 tail = timeout.prev;
826 }
827
828 timeout.prev = null;
829 timeout.next = null;
830 timeout.bucket = null;
831 timeout.timer.pendingTimeouts.decrementAndGet();
832 return next;
833 }
834
835
836
837
838 public void clearTimeouts(Set<Timeout> set) {
839 for (;;) {
840 HashedWheelTimeout timeout = pollTimeout();
841 if (timeout == null) {
842 return;
843 }
844 if (timeout.isExpired() || timeout.isCancelled()) {
845 continue;
846 }
847 set.add(timeout);
848 }
849 }
850
851 private HashedWheelTimeout pollTimeout() {
852 HashedWheelTimeout head = this.head;
853 if (head == null) {
854 return null;
855 }
856 HashedWheelTimeout next = head.next;
857 if (next == null) {
858 tail = this.head = null;
859 } else {
860 this.head = next;
861 next.prev = null;
862 }
863
864
865 head.next = null;
866 head.prev = null;
867 head.bucket = null;
868 return head;
869 }
870 }
871 }