1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util;
17
18 import static io.netty.util.internal.ObjectUtil.checkInRange;
19 import static io.netty.util.internal.ObjectUtil.checkPositive;
20 import static io.netty.util.internal.ObjectUtil.checkNotNull;
21
22 import io.netty.util.concurrent.ImmediateExecutor;
23 import io.netty.util.internal.MathUtil;
24 import io.netty.util.internal.PlatformDependent;
25 import io.netty.util.internal.logging.InternalLogger;
26 import io.netty.util.internal.logging.InternalLoggerFactory;
27
28 import java.util.Collections;
29 import java.util.HashSet;
30 import java.util.Queue;
31 import java.util.Set;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
41 import java.util.concurrent.atomic.AtomicLong;
42
43 import static io.netty.util.internal.StringUtil.simpleClassName;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 public class HashedWheelTimer implements Timer {
87
88 static final InternalLogger logger =
89 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
90
91 private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
92 private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
93 private static final int INSTANCE_COUNT_LIMIT = 64;
94 private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
95 private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
96 .newResourceLeakDetector(HashedWheelTimer.class, 1);
97
98 private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
99 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
100
101 private final ResourceLeakTracker<HashedWheelTimer> leak;
102 private final Worker worker = new Worker();
103 private final Thread workerThread;
104
105 public static final int WORKER_STATE_INIT = 0;
106 public static final int WORKER_STATE_STARTED = 1;
107 public static final int WORKER_STATE_SHUTDOWN = 2;
108 @SuppressWarnings({"unused", "FieldMayBeFinal"})
109 private volatile int workerState;
110
111 private final long tickDuration;
112 private final HashedWheelBucket[] wheel;
113 private final int mask;
114 private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
115 private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
116 private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
117 private final AtomicLong pendingTimeouts = new AtomicLong(0);
118 private final long maxPendingTimeouts;
119 private final Executor taskExecutor;
120
121 private volatile long startTime;
122
123
124
125
126
127
128 public HashedWheelTimer() {
129 this(Executors.defaultThreadFactory());
130 }
131
132
133
134
135
136
137
138
139
140
141
142 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
143 this(Executors.defaultThreadFactory(), tickDuration, unit);
144 }
145
146
147
148
149
150
151
152
153
154
155
156 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
157 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
158 }
159
160
161
162
163
164
165
166
167
168
169 public HashedWheelTimer(ThreadFactory threadFactory) {
170 this(threadFactory, 100, TimeUnit.MILLISECONDS);
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184 public HashedWheelTimer(
185 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
186 this(threadFactory, tickDuration, unit, 512);
187 }
188
189
190
191
192
193
194
195
196
197
198
199
200
201 public HashedWheelTimer(
202 ThreadFactory threadFactory,
203 long tickDuration, TimeUnit unit, int ticksPerWheel) {
204 this(threadFactory, tickDuration, unit, ticksPerWheel, true);
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222 public HashedWheelTimer(
223 ThreadFactory threadFactory,
224 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
225 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
226 }
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248 public HashedWheelTimer(
249 ThreadFactory threadFactory,
250 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
251 long maxPendingTimeouts) {
252 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
253 maxPendingTimeouts, ImmediateExecutor.INSTANCE);
254 }
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278 public HashedWheelTimer(
279 ThreadFactory threadFactory,
280 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
281 long maxPendingTimeouts, Executor taskExecutor) {
282
283 checkNotNull(threadFactory, "threadFactory");
284 checkNotNull(unit, "unit");
285 checkPositive(tickDuration, "tickDuration");
286 checkPositive(ticksPerWheel, "ticksPerWheel");
287 this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
288
289
290 wheel = createWheel(ticksPerWheel);
291 mask = wheel.length - 1;
292
293
294 long duration = unit.toNanos(tickDuration);
295
296
297 if (duration >= Long.MAX_VALUE / wheel.length) {
298 throw new IllegalArgumentException(String.format(
299 "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
300 tickDuration, Long.MAX_VALUE / wheel.length));
301 }
302
303 if (duration < MILLISECOND_NANOS) {
304 logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
305 tickDuration, MILLISECOND_NANOS);
306 this.tickDuration = MILLISECOND_NANOS;
307 } else {
308 this.tickDuration = duration;
309 }
310
311 workerThread = threadFactory.newThread(worker);
312
313 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
314
315 this.maxPendingTimeouts = maxPendingTimeouts;
316
317 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
318 WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
319 reportTooManyInstances();
320 }
321 }
322
323 @Override
324 protected void finalize() throws Throwable {
325 try {
326 super.finalize();
327 } finally {
328
329
330 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
331 INSTANCE_COUNTER.decrementAndGet();
332 }
333 }
334 }
335
336 private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
337 ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);
338
339 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
340 for (int i = 0; i < wheel.length; i ++) {
341 wheel[i] = new HashedWheelBucket();
342 }
343 return wheel;
344 }
345
346
347
348
349
350
351
352
353 public void start() {
354 switch (WORKER_STATE_UPDATER.get(this)) {
355 case WORKER_STATE_INIT:
356 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
357 workerThread.start();
358 }
359 break;
360 case WORKER_STATE_STARTED:
361 break;
362 case WORKER_STATE_SHUTDOWN:
363 throw new IllegalStateException("cannot be started once stopped");
364 default:
365 throw new Error("Invalid WorkerState");
366 }
367
368
369 while (startTime == 0) {
370 try {
371 startTimeInitialized.await();
372 } catch (InterruptedException ignore) {
373
374 }
375 }
376 }
377
378 @Override
379 public Set<Timeout> stop() {
380 if (Thread.currentThread() == workerThread) {
381 throw new IllegalStateException(
382 HashedWheelTimer.class.getSimpleName() +
383 ".stop() cannot be called from " +
384 TimerTask.class.getSimpleName());
385 }
386
387 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
388
389 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
390 INSTANCE_COUNTER.decrementAndGet();
391 if (leak != null) {
392 boolean closed = leak.close(this);
393 assert closed;
394 }
395 }
396
397 return Collections.emptySet();
398 }
399
400 try {
401 boolean interrupted = false;
402 while (workerThread.isAlive()) {
403 workerThread.interrupt();
404 try {
405 workerThread.join(100);
406 } catch (InterruptedException ignored) {
407 interrupted = true;
408 }
409 }
410
411 if (interrupted) {
412 Thread.currentThread().interrupt();
413 }
414 } finally {
415 INSTANCE_COUNTER.decrementAndGet();
416 if (leak != null) {
417 boolean closed = leak.close(this);
418 assert closed;
419 }
420 }
421 Set<Timeout> unprocessed = worker.unprocessedTimeouts();
422 Set<Timeout> cancelled = new HashSet<Timeout>(unprocessed.size());
423 for (Timeout timeout : unprocessed) {
424 if (timeout.cancel()) {
425 cancelled.add(timeout);
426 }
427 }
428 return cancelled;
429 }
430
431 @Override
432 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
433 checkNotNull(task, "task");
434 checkNotNull(unit, "unit");
435
436 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
437
438 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
439 pendingTimeouts.decrementAndGet();
440 throw new RejectedExecutionException("Number of pending timeouts ("
441 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
442 + "timeouts (" + maxPendingTimeouts + ")");
443 }
444
445 start();
446
447
448
449 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
450
451
452 if (delay > 0 && deadline < 0) {
453 deadline = Long.MAX_VALUE;
454 }
455 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
456 timeouts.add(timeout);
457 return timeout;
458 }
459
460
461
462
463 public long pendingTimeouts() {
464 return pendingTimeouts.get();
465 }
466
467 private static void reportTooManyInstances() {
468 if (logger.isErrorEnabled()) {
469 String resourceType = simpleClassName(HashedWheelTimer.class);
470 logger.error("You are creating too many " + resourceType + " instances. " +
471 resourceType + " is a shared resource that must be reused across the JVM, " +
472 "so that only a few instances are created.");
473 }
474 }
475
476 private final class Worker implements Runnable {
477 private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
478
479 private long tick;
480
481 @Override
482 public void run() {
483
484 startTime = System.nanoTime();
485 if (startTime == 0) {
486
487 startTime = 1;
488 }
489
490
491 startTimeInitialized.countDown();
492
493 do {
494 final long deadline = waitForNextTick();
495 if (deadline > 0) {
496 int idx = (int) (tick & mask);
497 processCancelledTasks();
498 HashedWheelBucket bucket =
499 wheel[idx];
500 transferTimeoutsToBuckets();
501 bucket.expireTimeouts(deadline);
502 tick++;
503 }
504 } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
505
506
507 for (HashedWheelBucket bucket: wheel) {
508 bucket.clearTimeouts(unprocessedTimeouts);
509 }
510 for (;;) {
511 HashedWheelTimeout timeout = timeouts.poll();
512 if (timeout == null) {
513 break;
514 }
515 if (!timeout.isCancelled()) {
516 unprocessedTimeouts.add(timeout);
517 }
518 }
519 processCancelledTasks();
520 }
521
522 private void transferTimeoutsToBuckets() {
523
524
525 for (int i = 0; i < 100000; i++) {
526 HashedWheelTimeout timeout = timeouts.poll();
527 if (timeout == null) {
528
529 break;
530 }
531 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
532
533 continue;
534 }
535
536 long calculated = timeout.deadline / tickDuration;
537 timeout.remainingRounds = (calculated - tick) / wheel.length;
538
539 final long ticks = Math.max(calculated, tick);
540 int stopIndex = (int) (ticks & mask);
541
542 HashedWheelBucket bucket = wheel[stopIndex];
543 bucket.addTimeout(timeout);
544 }
545 }
546
547 private void processCancelledTasks() {
548 for (;;) {
549 HashedWheelTimeout timeout = cancelledTimeouts.poll();
550 if (timeout == null) {
551
552 break;
553 }
554 try {
555 timeout.remove();
556 } catch (Throwable t) {
557 if (logger.isWarnEnabled()) {
558 logger.warn("An exception was thrown while process a cancellation task", t);
559 }
560 }
561 }
562 }
563
564
565
566
567
568
569
570 private long waitForNextTick() {
571 long deadline = tickDuration * (tick + 1);
572
573 for (;;) {
574 final long currentTime = System.nanoTime() - startTime;
575 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
576
577 if (sleepTimeMs <= 0) {
578 if (currentTime == Long.MIN_VALUE) {
579 return -Long.MAX_VALUE;
580 } else {
581 return currentTime;
582 }
583 }
584
585
586
587
588
589
590 if (PlatformDependent.isWindows()) {
591 sleepTimeMs = sleepTimeMs / 10 * 10;
592 if (sleepTimeMs == 0) {
593 sleepTimeMs = 1;
594 }
595 }
596
597 try {
598 Thread.sleep(sleepTimeMs);
599 } catch (InterruptedException ignored) {
600 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
601 return Long.MIN_VALUE;
602 }
603 }
604 }
605 }
606
607 public Set<Timeout> unprocessedTimeouts() {
608 return Collections.unmodifiableSet(unprocessedTimeouts);
609 }
610 }
611
612 private static final class HashedWheelTimeout implements Timeout, Runnable {
613
614 private static final int ST_INIT = 0;
615 private static final int ST_CANCELLED = 1;
616 private static final int ST_EXPIRED = 2;
617 private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
618 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
619
620 private final HashedWheelTimer timer;
621 private final TimerTask task;
622 private final long deadline;
623
624 @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
625 private volatile int state = ST_INIT;
626
627
628
629 long remainingRounds;
630
631
632
633 HashedWheelTimeout next;
634 HashedWheelTimeout prev;
635
636
637 HashedWheelBucket bucket;
638
639 HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
640 this.timer = timer;
641 this.task = task;
642 this.deadline = deadline;
643 }
644
645 @Override
646 public Timer timer() {
647 return timer;
648 }
649
650 @Override
651 public TimerTask task() {
652 return task;
653 }
654
655 @Override
656 public boolean cancel() {
657
658 if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
659 return false;
660 }
661
662
663
664 timer.cancelledTimeouts.add(this);
665 return true;
666 }
667
668 void remove() {
669 HashedWheelBucket bucket = this.bucket;
670 if (bucket != null) {
671 bucket.remove(this);
672 } else {
673 timer.pendingTimeouts.decrementAndGet();
674 }
675 }
676
677 public boolean compareAndSetState(int expected, int state) {
678 return STATE_UPDATER.compareAndSet(this, expected, state);
679 }
680
681 public int state() {
682 return state;
683 }
684
685 @Override
686 public boolean isCancelled() {
687 return state() == ST_CANCELLED;
688 }
689
690 @Override
691 public boolean isExpired() {
692 return state() == ST_EXPIRED;
693 }
694
695 public void expire() {
696 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
697 return;
698 }
699
700 try {
701 timer.taskExecutor.execute(this);
702 } catch (Throwable t) {
703 if (logger.isWarnEnabled()) {
704 logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
705 + " for execution.", t);
706 }
707 }
708 }
709
710 @Override
711 public void run() {
712 try {
713 task.run(this);
714 } catch (Throwable t) {
715 if (logger.isWarnEnabled()) {
716 logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
717 }
718 }
719 }
720
721 @Override
722 public String toString() {
723 final long currentTime = System.nanoTime();
724 long remaining = deadline - currentTime + timer.startTime;
725
726 StringBuilder buf = new StringBuilder(192)
727 .append(simpleClassName(this))
728 .append('(')
729 .append("deadline: ");
730 if (remaining > 0) {
731 buf.append(remaining)
732 .append(" ns later");
733 } else if (remaining < 0) {
734 buf.append(-remaining)
735 .append(" ns ago");
736 } else {
737 buf.append("now");
738 }
739
740 if (isCancelled()) {
741 buf.append(", cancelled");
742 }
743
744 return buf.append(", task: ")
745 .append(task())
746 .append(')')
747 .toString();
748 }
749 }
750
751
752
753
754
755
756 private static final class HashedWheelBucket {
757
758 private HashedWheelTimeout head;
759 private HashedWheelTimeout tail;
760
761
762
763
764 public void addTimeout(HashedWheelTimeout timeout) {
765 assert timeout.bucket == null;
766 timeout.bucket = this;
767 if (head == null) {
768 head = tail = timeout;
769 } else {
770 tail.next = timeout;
771 timeout.prev = tail;
772 tail = timeout;
773 }
774 }
775
776
777
778
779 public void expireTimeouts(long deadline) {
780 HashedWheelTimeout timeout = head;
781
782
783 while (timeout != null) {
784 HashedWheelTimeout next = timeout.next;
785 if (timeout.remainingRounds <= 0) {
786 next = remove(timeout);
787 if (timeout.deadline <= deadline) {
788 timeout.expire();
789 } else {
790
791 throw new IllegalStateException(String.format(
792 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
793 }
794 } else if (timeout.isCancelled()) {
795 next = remove(timeout);
796 } else {
797 timeout.remainingRounds --;
798 }
799 timeout = next;
800 }
801 }
802
803 public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
804 HashedWheelTimeout next = timeout.next;
805
806 if (timeout.prev != null) {
807 timeout.prev.next = next;
808 }
809 if (timeout.next != null) {
810 timeout.next.prev = timeout.prev;
811 }
812
813 if (timeout == head) {
814
815 if (timeout == tail) {
816 tail = null;
817 head = null;
818 } else {
819 head = next;
820 }
821 } else if (timeout == tail) {
822
823 tail = timeout.prev;
824 }
825
826 timeout.prev = null;
827 timeout.next = null;
828 timeout.bucket = null;
829 timeout.timer.pendingTimeouts.decrementAndGet();
830 return next;
831 }
832
833
834
835
836 public void clearTimeouts(Set<Timeout> set) {
837 for (;;) {
838 HashedWheelTimeout timeout = pollTimeout();
839 if (timeout == null) {
840 return;
841 }
842 if (timeout.isExpired() || timeout.isCancelled()) {
843 continue;
844 }
845 set.add(timeout);
846 }
847 }
848
849 private HashedWheelTimeout pollTimeout() {
850 HashedWheelTimeout head = this.head;
851 if (head == null) {
852 return null;
853 }
854 HashedWheelTimeout next = head.next;
855 if (next == null) {
856 tail = this.head = null;
857 } else {
858 this.head = next;
859 next.prev = null;
860 }
861
862
863 head.next = null;
864 head.prev = null;
865 head.bucket = null;
866 return head;
867 }
868 }
869 }