1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util;
17
18 import static io.netty.util.internal.ObjectUtil.checkPositive;
19 import static io.netty.util.internal.ObjectUtil.checkNotNull;
20
21 import io.netty.util.concurrent.ImmediateExecutor;
22 import io.netty.util.internal.MathUtil;
23 import io.netty.util.internal.PlatformDependent;
24 import io.netty.util.internal.logging.InternalLogger;
25 import io.netty.util.internal.logging.InternalLoggerFactory;
26
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.RejectedExecutionException;
35 import java.util.concurrent.ThreadFactory;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
40 import java.util.concurrent.atomic.AtomicLong;
41
42 import static io.netty.util.internal.StringUtil.simpleClassName;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 public class HashedWheelTimer implements Timer {
86
87 static final InternalLogger logger =
88 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
89
90 private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
91 private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
92 private static final int INSTANCE_COUNT_LIMIT = 64;
93 private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
94 private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
95 .newResourceLeakDetector(HashedWheelTimer.class, 1);
96
97 private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
98 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
99
100 private final ResourceLeakTracker<HashedWheelTimer> leak;
101 private final Worker worker = new Worker();
102 private final Thread workerThread;
103
104 public static final int WORKER_STATE_INIT = 0;
105 public static final int WORKER_STATE_STARTED = 1;
106 public static final int WORKER_STATE_SHUTDOWN = 2;
107 @SuppressWarnings({"unused", "FieldMayBeFinal"})
108 private volatile int workerState;
109
110 private final long tickDuration;
111 private final HashedWheelBucket[] wheel;
112 private final int mask;
113 private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
114 private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
115 private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
116 private final AtomicLong pendingTimeouts = new AtomicLong(0);
117 private final long maxPendingTimeouts;
118 private final Executor taskExecutor;
119
120 private volatile long startTime;
121
122
123
124
125
126
127 public HashedWheelTimer() {
128 this(Executors.defaultThreadFactory());
129 }
130
131
132
133
134
135
136
137
138
139
140
141 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
142 this(Executors.defaultThreadFactory(), tickDuration, unit);
143 }
144
145
146
147
148
149
150
151
152
153
154
155 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
156 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
157 }
158
159
160
161
162
163
164
165
166
167
168 public HashedWheelTimer(ThreadFactory threadFactory) {
169 this(threadFactory, 100, TimeUnit.MILLISECONDS);
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183 public HashedWheelTimer(
184 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
185 this(threadFactory, tickDuration, unit, 512);
186 }
187
188
189
190
191
192
193
194
195
196
197
198
199
200 public HashedWheelTimer(
201 ThreadFactory threadFactory,
202 long tickDuration, TimeUnit unit, int ticksPerWheel) {
203 this(threadFactory, tickDuration, unit, ticksPerWheel, true);
204 }
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221 public HashedWheelTimer(
222 ThreadFactory threadFactory,
223 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
224 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
225 }
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247 public HashedWheelTimer(
248 ThreadFactory threadFactory,
249 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
250 long maxPendingTimeouts) {
251 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
252 maxPendingTimeouts, ImmediateExecutor.INSTANCE);
253 }
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277 public HashedWheelTimer(
278 ThreadFactory threadFactory,
279 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
280 long maxPendingTimeouts, Executor taskExecutor) {
281
282 checkNotNull(threadFactory, "threadFactory");
283 checkNotNull(unit, "unit");
284 checkPositive(tickDuration, "tickDuration");
285 checkPositive(ticksPerWheel, "ticksPerWheel");
286 this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
287
288
289 wheel = createWheel(ticksPerWheel);
290 mask = wheel.length - 1;
291
292
293 long duration = unit.toNanos(tickDuration);
294
295
296 if (duration >= Long.MAX_VALUE / wheel.length) {
297 throw new IllegalArgumentException(String.format(
298 "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
299 tickDuration, Long.MAX_VALUE / wheel.length));
300 }
301
302 if (duration < MILLISECOND_NANOS) {
303 logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
304 tickDuration, MILLISECOND_NANOS);
305 this.tickDuration = MILLISECOND_NANOS;
306 } else {
307 this.tickDuration = duration;
308 }
309
310 workerThread = threadFactory.newThread(worker);
311
312 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
313
314 this.maxPendingTimeouts = maxPendingTimeouts;
315
316 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
317 WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
318 reportTooManyInstances();
319 }
320 }
321
322 @Override
323 protected void finalize() throws Throwable {
324 try {
325 super.finalize();
326 } finally {
327
328
329 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
330 INSTANCE_COUNTER.decrementAndGet();
331 }
332 }
333 }
334
335 private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
336 ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);
337
338 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
339 for (int i = 0; i < wheel.length; i ++) {
340 wheel[i] = new HashedWheelBucket();
341 }
342 return wheel;
343 }
344
345
346
347
348
349
350
351
352 public void start() {
353 int state = WORKER_STATE_UPDATER.get(this);
354 switch (state) {
355 case WORKER_STATE_INIT:
356 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
357 workerThread.start();
358 }
359 break;
360 case WORKER_STATE_STARTED:
361 break;
362 case WORKER_STATE_SHUTDOWN:
363 throw new IllegalStateException("cannot be started once stopped");
364 default:
365 throw new Error("Invalid WorkerState: " + state);
366 }
367
368
369 while (startTime == 0) {
370 try {
371 startTimeInitialized.await();
372 } catch (InterruptedException ignore) {
373
374 }
375 }
376 }
377
378 @Override
379 public Set<Timeout> stop() {
380 if (Thread.currentThread() == workerThread) {
381 throw new IllegalStateException(
382 HashedWheelTimer.class.getSimpleName() +
383 ".stop() cannot be called from " +
384 TimerTask.class.getSimpleName());
385 }
386
387 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
388
389 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
390 INSTANCE_COUNTER.decrementAndGet();
391 if (leak != null) {
392 boolean closed = leak.close(this);
393 assert closed;
394 }
395 }
396
397 return Collections.emptySet();
398 }
399
400 try {
401 boolean interrupted = false;
402 while (workerThread.isAlive()) {
403 workerThread.interrupt();
404 try {
405 workerThread.join(100);
406 } catch (InterruptedException ignored) {
407 interrupted = true;
408 }
409 }
410
411 if (interrupted) {
412 Thread.currentThread().interrupt();
413 }
414 } finally {
415 INSTANCE_COUNTER.decrementAndGet();
416 if (leak != null) {
417 boolean closed = leak.close(this);
418 assert closed;
419 }
420 }
421 Set<Timeout> unprocessed = worker.unprocessedTimeouts();
422 Set<Timeout> cancelled = new HashSet<Timeout>(unprocessed.size());
423 for (Timeout timeout : unprocessed) {
424 if (timeout.cancel()) {
425 cancelled.add(timeout);
426 }
427 }
428 return cancelled;
429 }
430
431 @Override
432 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
433 checkNotNull(task, "task");
434 checkNotNull(unit, "unit");
435
436 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
437
438 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
439 pendingTimeouts.decrementAndGet();
440 throw new RejectedExecutionException("Number of pending timeouts ("
441 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
442 + "timeouts (" + maxPendingTimeouts + ")");
443 }
444
445 start();
446
447
448
449 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
450
451
452 if (delay > 0 && deadline < 0) {
453 deadline = Long.MAX_VALUE;
454 }
455 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
456 timeouts.add(timeout);
457 return timeout;
458 }
459
460
461
462
463 public long pendingTimeouts() {
464 return pendingTimeouts.get();
465 }
466
467 private static void reportTooManyInstances() {
468 if (logger.isErrorEnabled()) {
469 String resourceType = simpleClassName(HashedWheelTimer.class);
470 logger.error("You are creating too many " + resourceType + " instances. " +
471 resourceType + " is a shared resource that must be reused across the JVM, " +
472 "so that only a few instances are created.");
473 }
474 }
475
476 private final class Worker implements Runnable {
477 private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
478
479 private long tick;
480
481 @Override
482 public void run() {
483
484 startTime = System.nanoTime();
485 if (startTime == 0) {
486
487 startTime = 1;
488 }
489
490
491 startTimeInitialized.countDown();
492
493 do {
494 final long deadline = waitForNextTick();
495 if (deadline > 0) {
496 int idx = (int) (tick & mask);
497 processCancelledTasks();
498 HashedWheelBucket bucket =
499 wheel[idx];
500 transferTimeoutsToBuckets();
501 bucket.expireTimeouts(deadline);
502 tick++;
503 }
504 } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
505
506
507 for (HashedWheelBucket bucket: wheel) {
508 bucket.clearTimeouts(unprocessedTimeouts);
509 }
510 for (;;) {
511 HashedWheelTimeout timeout = timeouts.poll();
512 if (timeout == null) {
513 break;
514 }
515 if (!timeout.isCancelled()) {
516 unprocessedTimeouts.add(timeout);
517 }
518 }
519 processCancelledTasks();
520 }
521
522 private void transferTimeoutsToBuckets() {
523
524
525 for (int i = 0; i < 100000; i++) {
526 HashedWheelTimeout timeout = timeouts.poll();
527 if (timeout == null) {
528
529 break;
530 }
531 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
532
533 continue;
534 }
535
536 long calculated = timeout.deadline / tickDuration;
537 timeout.remainingRounds = (calculated - tick) / wheel.length;
538
539 final long ticks = Math.max(calculated, tick);
540 int stopIndex = (int) (ticks & mask);
541
542 HashedWheelBucket bucket = wheel[stopIndex];
543 bucket.addTimeout(timeout);
544 }
545 }
546
547 private void processCancelledTasks() {
548 for (;;) {
549 HashedWheelTimeout timeout = cancelledTimeouts.poll();
550 if (timeout == null) {
551
552 break;
553 }
554 try {
555 timeout.removeAfterCancellation();
556 } catch (Throwable t) {
557 if (logger.isWarnEnabled()) {
558 logger.warn("An exception was thrown while process a cancellation task", t);
559 }
560 }
561 }
562 }
563
564
565
566
567
568
569
570 private long waitForNextTick() {
571 long deadline = tickDuration * (tick + 1);
572
573 for (;;) {
574 final long currentTime = System.nanoTime() - startTime;
575 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
576
577 if (sleepTimeMs <= 0) {
578 if (currentTime == Long.MIN_VALUE) {
579 return -Long.MAX_VALUE;
580 } else {
581 return currentTime;
582 }
583 }
584
585
586
587
588
589
590 if (PlatformDependent.isWindows()) {
591 sleepTimeMs = sleepTimeMs / 10 * 10;
592 if (sleepTimeMs == 0) {
593 sleepTimeMs = 1;
594 }
595 }
596
597 try {
598 Thread.sleep(sleepTimeMs);
599 } catch (InterruptedException ignored) {
600 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
601 return Long.MIN_VALUE;
602 }
603 }
604 }
605 }
606
607 public Set<Timeout> unprocessedTimeouts() {
608 return Collections.unmodifiableSet(unprocessedTimeouts);
609 }
610 }
611
612 private static final class HashedWheelTimeout implements Timeout, Runnable {
613
614 private static final int ST_INIT = 0;
615 private static final int ST_CANCELLED = 1;
616 private static final int ST_EXPIRED = 2;
617 private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
618 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
619
620 private final HashedWheelTimer timer;
621 private final TimerTask task;
622 private final long deadline;
623
624 @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
625 private volatile int state = ST_INIT;
626
627
628
629 long remainingRounds;
630
631
632
633 HashedWheelTimeout next;
634 HashedWheelTimeout prev;
635
636
637 HashedWheelBucket bucket;
638
639 HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
640 this.timer = timer;
641 this.task = task;
642 this.deadline = deadline;
643 }
644
645 @Override
646 public Timer timer() {
647 return timer;
648 }
649
650 @Override
651 public TimerTask task() {
652 return task;
653 }
654
655 @Override
656 public boolean cancel() {
657
658 if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
659 return false;
660 }
661
662
663
664 timer.cancelledTimeouts.add(this);
665 return true;
666 }
667
668 private void remove() {
669 HashedWheelBucket bucket = this.bucket;
670 if (bucket != null) {
671 bucket.remove(this);
672 }
673 timer.pendingTimeouts.decrementAndGet();
674 }
675 void removeAfterCancellation() {
676 remove();
677 task.cancelled(this);
678 }
679
680 public boolean compareAndSetState(int expected, int state) {
681 return STATE_UPDATER.compareAndSet(this, expected, state);
682 }
683
684 public int state() {
685 return state;
686 }
687
688 @Override
689 public boolean isCancelled() {
690 return state() == ST_CANCELLED;
691 }
692
693 @Override
694 public boolean isExpired() {
695 return state() == ST_EXPIRED;
696 }
697
698 public void expire() {
699 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
700 return;
701 }
702
703 try {
704 remove();
705 timer.taskExecutor.execute(this);
706 } catch (Throwable t) {
707 if (logger.isWarnEnabled()) {
708 logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
709 + " for execution.", t);
710 }
711 }
712 }
713
714 @Override
715 public void run() {
716 try {
717 task.run(this);
718 } catch (Throwable t) {
719 if (logger.isWarnEnabled()) {
720 logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
721 }
722 }
723 }
724
725 @Override
726 public String toString() {
727 final long currentTime = System.nanoTime();
728 long remaining = deadline - currentTime + timer.startTime;
729
730 StringBuilder buf = new StringBuilder(192)
731 .append(simpleClassName(this))
732 .append('(')
733 .append("deadline: ");
734 if (remaining > 0) {
735 buf.append(remaining)
736 .append(" ns later");
737 } else if (remaining < 0) {
738 buf.append(-remaining)
739 .append(" ns ago");
740 } else {
741 buf.append("now");
742 }
743
744 if (isCancelled()) {
745 buf.append(", cancelled");
746 }
747
748 return buf.append(", task: ")
749 .append(task())
750 .append(')')
751 .toString();
752 }
753 }
754
755
756
757
758
759
760 private static final class HashedWheelBucket {
761
762 private HashedWheelTimeout head;
763 private HashedWheelTimeout tail;
764
765
766
767
768 public void addTimeout(HashedWheelTimeout timeout) {
769 assert timeout.bucket == null;
770 timeout.bucket = this;
771 if (head == null) {
772 head = tail = timeout;
773 } else {
774 tail.next = timeout;
775 timeout.prev = tail;
776 tail = timeout;
777 }
778 }
779
780
781
782
783 public void expireTimeouts(long deadline) {
784 HashedWheelTimeout timeout = head;
785
786
787 while (timeout != null) {
788 HashedWheelTimeout next = timeout.next;
789 if (timeout.remainingRounds <= 0) {
790 if (timeout.deadline <= deadline) {
791 timeout.expire();
792 } else {
793
794 throw new IllegalStateException(String.format(
795 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
796 }
797 } else if (!timeout.isCancelled()) {
798 timeout.remainingRounds --;
799 }
800 timeout = next;
801 }
802 }
803
804 public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
805 HashedWheelTimeout next = timeout.next;
806
807 if (timeout.prev != null) {
808 timeout.prev.next = next;
809 }
810 if (timeout.next != null) {
811 timeout.next.prev = timeout.prev;
812 }
813
814 if (timeout == head) {
815
816 if (timeout == tail) {
817 tail = null;
818 head = null;
819 } else {
820 head = next;
821 }
822 } else if (timeout == tail) {
823
824 tail = timeout.prev;
825 }
826
827 timeout.prev = null;
828 timeout.next = null;
829 timeout.bucket = null;
830 return next;
831 }
832
833
834
835
836 public void clearTimeouts(Set<Timeout> set) {
837 for (;;) {
838 HashedWheelTimeout timeout = pollTimeout();
839 if (timeout == null) {
840 return;
841 }
842 if (timeout.isExpired() || timeout.isCancelled()) {
843 continue;
844 }
845 set.add(timeout);
846 }
847 }
848
849 private HashedWheelTimeout pollTimeout() {
850 HashedWheelTimeout head = this.head;
851 if (head == null) {
852 return null;
853 }
854 HashedWheelTimeout next = head.next;
855 if (next == null) {
856 tail = this.head = null;
857 } else {
858 this.head = next;
859 next.prev = null;
860 }
861
862
863 head.next = null;
864 head.prev = null;
865 head.bucket = null;
866 return head;
867 }
868 }
869 }