1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util;
17
18 import static io.netty.util.internal.ObjectUtil.checkInRange;
19 import static io.netty.util.internal.ObjectUtil.checkPositive;
20 import static io.netty.util.internal.ObjectUtil.checkNotNull;
21
22 import io.netty.util.concurrent.ImmediateExecutor;
23 import io.netty.util.internal.MathUtil;
24 import io.netty.util.internal.PlatformDependent;
25 import io.netty.util.internal.logging.InternalLogger;
26 import io.netty.util.internal.logging.InternalLoggerFactory;
27
28 import java.util.Collections;
29 import java.util.HashSet;
30 import java.util.Queue;
31 import java.util.Set;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
41 import java.util.concurrent.atomic.AtomicLong;
42
43 import static io.netty.util.internal.StringUtil.simpleClassName;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 public class HashedWheelTimer implements Timer {
87
88 static final InternalLogger logger =
89 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
90
91 private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
92 private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
93 private static final int INSTANCE_COUNT_LIMIT = 64;
94 private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
95 private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
96 .newResourceLeakDetector(HashedWheelTimer.class, 1);
97
98 private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
99 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
100
101 private final ResourceLeakTracker<HashedWheelTimer> leak;
102 private final Worker worker = new Worker();
103 private final Thread workerThread;
104
105 public static final int WORKER_STATE_INIT = 0;
106 public static final int WORKER_STATE_STARTED = 1;
107 public static final int WORKER_STATE_SHUTDOWN = 2;
108 @SuppressWarnings({"unused", "FieldMayBeFinal"})
109 private volatile int workerState;
110
111 private final long tickDuration;
112 private final HashedWheelBucket[] wheel;
113 private final int mask;
114 private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
115 private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
116 private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
117 private final AtomicLong pendingTimeouts = new AtomicLong(0);
118 private final long maxPendingTimeouts;
119 private final Executor taskExecutor;
120
121 private volatile long startTime;
122
123
124
125
126
127
128 public HashedWheelTimer() {
129 this(Executors.defaultThreadFactory());
130 }
131
132
133
134
135
136
137
138
139
140
141
142 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
143 this(Executors.defaultThreadFactory(), tickDuration, unit);
144 }
145
146
147
148
149
150
151
152
153
154
155
156 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
157 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
158 }
159
160
161
162
163
164
165
166
167
168
169 public HashedWheelTimer(ThreadFactory threadFactory) {
170 this(threadFactory, 100, TimeUnit.MILLISECONDS);
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184 public HashedWheelTimer(
185 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
186 this(threadFactory, tickDuration, unit, 512);
187 }
188
189
190
191
192
193
194
195
196
197
198
199
200
201 public HashedWheelTimer(
202 ThreadFactory threadFactory,
203 long tickDuration, TimeUnit unit, int ticksPerWheel) {
204 this(threadFactory, tickDuration, unit, ticksPerWheel, true);
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222 public HashedWheelTimer(
223 ThreadFactory threadFactory,
224 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
225 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
226 }
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248 public HashedWheelTimer(
249 ThreadFactory threadFactory,
250 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
251 long maxPendingTimeouts) {
252 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
253 maxPendingTimeouts, ImmediateExecutor.INSTANCE);
254 }
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278 public HashedWheelTimer(
279 ThreadFactory threadFactory,
280 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
281 long maxPendingTimeouts, Executor taskExecutor) {
282
283 checkNotNull(threadFactory, "threadFactory");
284 checkNotNull(unit, "unit");
285 checkPositive(tickDuration, "tickDuration");
286 checkPositive(ticksPerWheel, "ticksPerWheel");
287 this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
288
289
290 wheel = createWheel(ticksPerWheel);
291 mask = wheel.length - 1;
292
293
294 long duration = unit.toNanos(tickDuration);
295
296
297 if (duration >= Long.MAX_VALUE / wheel.length) {
298 throw new IllegalArgumentException(String.format(
299 "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
300 tickDuration, Long.MAX_VALUE / wheel.length));
301 }
302
303 if (duration < MILLISECOND_NANOS) {
304 logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
305 tickDuration, MILLISECOND_NANOS);
306 this.tickDuration = MILLISECOND_NANOS;
307 } else {
308 this.tickDuration = duration;
309 }
310
311 workerThread = threadFactory.newThread(worker);
312
313 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
314
315 this.maxPendingTimeouts = maxPendingTimeouts;
316
317 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
318 WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
319 reportTooManyInstances();
320 }
321 }
322
323 @Override
324 protected void finalize() throws Throwable {
325 try {
326 super.finalize();
327 } finally {
328
329
330 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
331 INSTANCE_COUNTER.decrementAndGet();
332 }
333 }
334 }
335
336 private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
337 ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);
338
339 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
340 for (int i = 0; i < wheel.length; i ++) {
341 wheel[i] = new HashedWheelBucket();
342 }
343 return wheel;
344 }
345
346
347
348
349
350
351
352
353 public void start() {
354 switch (WORKER_STATE_UPDATER.get(this)) {
355 case WORKER_STATE_INIT:
356 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
357 workerThread.start();
358 }
359 break;
360 case WORKER_STATE_STARTED:
361 break;
362 case WORKER_STATE_SHUTDOWN:
363 throw new IllegalStateException("cannot be started once stopped");
364 default:
365 throw new Error("Invalid WorkerState");
366 }
367
368
369 while (startTime == 0) {
370 try {
371 startTimeInitialized.await();
372 } catch (InterruptedException ignore) {
373
374 }
375 }
376 }
377
378 @Override
379 public Set<Timeout> stop() {
380 if (Thread.currentThread() == workerThread) {
381 throw new IllegalStateException(
382 HashedWheelTimer.class.getSimpleName() +
383 ".stop() cannot be called from " +
384 TimerTask.class.getSimpleName());
385 }
386
387 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
388
389 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
390 INSTANCE_COUNTER.decrementAndGet();
391 if (leak != null) {
392 boolean closed = leak.close(this);
393 assert closed;
394 }
395 }
396
397 return Collections.emptySet();
398 }
399
400 try {
401 boolean interrupted = false;
402 while (workerThread.isAlive()) {
403 workerThread.interrupt();
404 try {
405 workerThread.join(100);
406 } catch (InterruptedException ignored) {
407 interrupted = true;
408 }
409 }
410
411 if (interrupted) {
412 Thread.currentThread().interrupt();
413 }
414 } finally {
415 INSTANCE_COUNTER.decrementAndGet();
416 if (leak != null) {
417 boolean closed = leak.close(this);
418 assert closed;
419 }
420 }
421 Set<Timeout> unprocessed = worker.unprocessedTimeouts();
422 Set<Timeout> cancelled = new HashSet<Timeout>(unprocessed.size());
423 for (Timeout timeout : unprocessed) {
424 if (timeout.cancel()) {
425 cancelled.add(timeout);
426 }
427 }
428 return cancelled;
429 }
430
431 @Override
432 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
433 checkNotNull(task, "task");
434 checkNotNull(unit, "unit");
435
436 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
437
438 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
439 pendingTimeouts.decrementAndGet();
440 throw new RejectedExecutionException("Number of pending timeouts ("
441 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
442 + "timeouts (" + maxPendingTimeouts + ")");
443 }
444
445 start();
446
447
448
449 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
450
451
452 if (delay > 0 && deadline < 0) {
453 deadline = Long.MAX_VALUE;
454 }
455 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
456 timeouts.add(timeout);
457 return timeout;
458 }
459
460
461
462
463 public long pendingTimeouts() {
464 return pendingTimeouts.get();
465 }
466
467 private static void reportTooManyInstances() {
468 if (logger.isErrorEnabled()) {
469 String resourceType = simpleClassName(HashedWheelTimer.class);
470 logger.error("You are creating too many " + resourceType + " instances. " +
471 resourceType + " is a shared resource that must be reused across the JVM, " +
472 "so that only a few instances are created.");
473 }
474 }
475
476 private final class Worker implements Runnable {
477 private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
478
479 private long tick;
480
481 @Override
482 public void run() {
483
484 startTime = System.nanoTime();
485 if (startTime == 0) {
486
487 startTime = 1;
488 }
489
490
491 startTimeInitialized.countDown();
492
493 do {
494 final long deadline = waitForNextTick();
495 if (deadline > 0) {
496 int idx = (int) (tick & mask);
497 processCancelledTasks();
498 HashedWheelBucket bucket =
499 wheel[idx];
500 transferTimeoutsToBuckets();
501 bucket.expireTimeouts(deadline);
502 tick++;
503 }
504 } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
505
506
507 for (HashedWheelBucket bucket: wheel) {
508 bucket.clearTimeouts(unprocessedTimeouts);
509 }
510 for (;;) {
511 HashedWheelTimeout timeout = timeouts.poll();
512 if (timeout == null) {
513 break;
514 }
515 if (!timeout.isCancelled()) {
516 unprocessedTimeouts.add(timeout);
517 }
518 }
519 processCancelledTasks();
520 }
521
522 private void transferTimeoutsToBuckets() {
523
524
525 for (int i = 0; i < 100000; i++) {
526 HashedWheelTimeout timeout = timeouts.poll();
527 if (timeout == null) {
528
529 break;
530 }
531 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
532
533 continue;
534 }
535
536 long calculated = timeout.deadline / tickDuration;
537 timeout.remainingRounds = (calculated - tick) / wheel.length;
538
539 final long ticks = Math.max(calculated, tick);
540 int stopIndex = (int) (ticks & mask);
541
542 HashedWheelBucket bucket = wheel[stopIndex];
543 bucket.addTimeout(timeout);
544 }
545 }
546
547 private void processCancelledTasks() {
548 for (;;) {
549 HashedWheelTimeout timeout = cancelledTimeouts.poll();
550 if (timeout == null) {
551
552 break;
553 }
554 try {
555 timeout.removeAfterCancellation();
556 } catch (Throwable t) {
557 if (logger.isWarnEnabled()) {
558 logger.warn("An exception was thrown while process a cancellation task", t);
559 }
560 }
561 }
562 }
563
564
565
566
567
568
569
570 private long waitForNextTick() {
571 long deadline = tickDuration * (tick + 1);
572
573 for (;;) {
574 final long currentTime = System.nanoTime() - startTime;
575 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
576
577 if (sleepTimeMs <= 0) {
578 if (currentTime == Long.MIN_VALUE) {
579 return -Long.MAX_VALUE;
580 } else {
581 return currentTime;
582 }
583 }
584
585
586
587
588
589
590 if (PlatformDependent.isWindows()) {
591 sleepTimeMs = sleepTimeMs / 10 * 10;
592 if (sleepTimeMs == 0) {
593 sleepTimeMs = 1;
594 }
595 }
596
597 try {
598 Thread.sleep(sleepTimeMs);
599 } catch (InterruptedException ignored) {
600 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
601 return Long.MIN_VALUE;
602 }
603 }
604 }
605 }
606
607 public Set<Timeout> unprocessedTimeouts() {
608 return Collections.unmodifiableSet(unprocessedTimeouts);
609 }
610 }
611
612 private static final class HashedWheelTimeout implements Timeout, Runnable {
613
614 private static final int ST_INIT = 0;
615 private static final int ST_CANCELLED = 1;
616 private static final int ST_EXPIRED = 2;
617 private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
618 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
619
620 private final HashedWheelTimer timer;
621 private final TimerTask task;
622 private final long deadline;
623
624 @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
625 private volatile int state = ST_INIT;
626
627
628
629 long remainingRounds;
630
631
632
633 HashedWheelTimeout next;
634 HashedWheelTimeout prev;
635
636
637 HashedWheelBucket bucket;
638
639 HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
640 this.timer = timer;
641 this.task = task;
642 this.deadline = deadline;
643 }
644
645 @Override
646 public Timer timer() {
647 return timer;
648 }
649
650 @Override
651 public TimerTask task() {
652 return task;
653 }
654
655 @Override
656 public boolean cancel() {
657
658 if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
659 return false;
660 }
661
662
663
664 timer.cancelledTimeouts.add(this);
665 return true;
666 }
667
668 void removeAfterCancellation() {
669 HashedWheelBucket bucket = this.bucket;
670 if (bucket != null) {
671 bucket.remove(this);
672 } else {
673 timer.pendingTimeouts.decrementAndGet();
674 }
675 task.cancelled(this);
676 }
677
678 public boolean compareAndSetState(int expected, int state) {
679 return STATE_UPDATER.compareAndSet(this, expected, state);
680 }
681
682 public int state() {
683 return state;
684 }
685
686 @Override
687 public boolean isCancelled() {
688 return state() == ST_CANCELLED;
689 }
690
691 @Override
692 public boolean isExpired() {
693 return state() == ST_EXPIRED;
694 }
695
696 public void expire() {
697 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
698 return;
699 }
700
701 try {
702 timer.taskExecutor.execute(this);
703 } catch (Throwable t) {
704 if (logger.isWarnEnabled()) {
705 logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
706 + " for execution.", t);
707 }
708 }
709 }
710
711 @Override
712 public void run() {
713 try {
714 task.run(this);
715 } catch (Throwable t) {
716 if (logger.isWarnEnabled()) {
717 logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
718 }
719 }
720 }
721
722 @Override
723 public String toString() {
724 final long currentTime = System.nanoTime();
725 long remaining = deadline - currentTime + timer.startTime;
726
727 StringBuilder buf = new StringBuilder(192)
728 .append(simpleClassName(this))
729 .append('(')
730 .append("deadline: ");
731 if (remaining > 0) {
732 buf.append(remaining)
733 .append(" ns later");
734 } else if (remaining < 0) {
735 buf.append(-remaining)
736 .append(" ns ago");
737 } else {
738 buf.append("now");
739 }
740
741 if (isCancelled()) {
742 buf.append(", cancelled");
743 }
744
745 return buf.append(", task: ")
746 .append(task())
747 .append(')')
748 .toString();
749 }
750 }
751
752
753
754
755
756
757 private static final class HashedWheelBucket {
758
759 private HashedWheelTimeout head;
760 private HashedWheelTimeout tail;
761
762
763
764
765 public void addTimeout(HashedWheelTimeout timeout) {
766 assert timeout.bucket == null;
767 timeout.bucket = this;
768 if (head == null) {
769 head = tail = timeout;
770 } else {
771 tail.next = timeout;
772 timeout.prev = tail;
773 tail = timeout;
774 }
775 }
776
777
778
779
780 public void expireTimeouts(long deadline) {
781 HashedWheelTimeout timeout = head;
782
783
784 while (timeout != null) {
785 HashedWheelTimeout next = timeout.next;
786 if (timeout.remainingRounds <= 0) {
787 next = remove(timeout);
788 if (timeout.deadline <= deadline) {
789 timeout.expire();
790 } else {
791
792 throw new IllegalStateException(String.format(
793 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
794 }
795 } else if (timeout.isCancelled()) {
796 next = remove(timeout);
797 } else {
798 timeout.remainingRounds --;
799 }
800 timeout = next;
801 }
802 }
803
804 public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
805 HashedWheelTimeout next = timeout.next;
806
807 if (timeout.prev != null) {
808 timeout.prev.next = next;
809 }
810 if (timeout.next != null) {
811 timeout.next.prev = timeout.prev;
812 }
813
814 if (timeout == head) {
815
816 if (timeout == tail) {
817 tail = null;
818 head = null;
819 } else {
820 head = next;
821 }
822 } else if (timeout == tail) {
823
824 tail = timeout.prev;
825 }
826
827 timeout.prev = null;
828 timeout.next = null;
829 timeout.bucket = null;
830 timeout.timer.pendingTimeouts.decrementAndGet();
831 return next;
832 }
833
834
835
836
837 public void clearTimeouts(Set<Timeout> set) {
838 for (;;) {
839 HashedWheelTimeout timeout = pollTimeout();
840 if (timeout == null) {
841 return;
842 }
843 if (timeout.isExpired() || timeout.isCancelled()) {
844 continue;
845 }
846 set.add(timeout);
847 }
848 }
849
850 private HashedWheelTimeout pollTimeout() {
851 HashedWheelTimeout head = this.head;
852 if (head == null) {
853 return null;
854 }
855 HashedWheelTimeout next = head.next;
856 if (next == null) {
857 tail = this.head = null;
858 } else {
859 this.head = next;
860 next.prev = null;
861 }
862
863
864 head.next = null;
865 head.prev = null;
866 head.bucket = null;
867 return head;
868 }
869 }
870 }