1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util;
17
18 import io.netty.util.internal.PlatformDependent;
19 import io.netty.util.internal.logging.InternalLogger;
20 import io.netty.util.internal.logging.InternalLoggerFactory;
21
22 import java.util.Collections;
23 import java.util.HashSet;
24 import java.util.Queue;
25 import java.util.Set;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
34 import java.util.concurrent.atomic.AtomicLong;
35
36 import static io.netty.util.internal.StringUtil.simpleClassName;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 public class HashedWheelTimer implements Timer {
80
81 static final InternalLogger logger =
82 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
83
84 private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
85 private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
86 private static final int INSTANCE_COUNT_LIMIT = 64;
87 private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
88 .newResourceLeakDetector(HashedWheelTimer.class, 1);
89
90 private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
91 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
92
93 private final ResourceLeakTracker<HashedWheelTimer> leak;
94 private final Worker worker = new Worker();
95 private final Thread workerThread;
96
97 public static final int WORKER_STATE_INIT = 0;
98 public static final int WORKER_STATE_STARTED = 1;
99 public static final int WORKER_STATE_SHUTDOWN = 2;
100 @SuppressWarnings({ "unused", "FieldMayBeFinal" })
101 private volatile int workerState;
102
103 private final long tickDuration;
104 private final HashedWheelBucket[] wheel;
105 private final int mask;
106 private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
107 private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
108 private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
109 private final AtomicLong pendingTimeouts = new AtomicLong(0);
110 private final long maxPendingTimeouts;
111
112 private volatile long startTime;
113
114
115
116
117
118
119 public HashedWheelTimer() {
120 this(Executors.defaultThreadFactory());
121 }
122
123
124
125
126
127
128
129
130
131
132
133 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
134 this(Executors.defaultThreadFactory(), tickDuration, unit);
135 }
136
137
138
139
140
141
142
143
144
145
146
147 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
148 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
149 }
150
151
152
153
154
155
156
157
158
159
160 public HashedWheelTimer(ThreadFactory threadFactory) {
161 this(threadFactory, 100, TimeUnit.MILLISECONDS);
162 }
163
164
165
166
167
168
169
170
171
172
173
174
175 public HashedWheelTimer(
176 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
177 this(threadFactory, tickDuration, unit, 512);
178 }
179
180
181
182
183
184
185
186
187
188
189
190
191
192 public HashedWheelTimer(
193 ThreadFactory threadFactory,
194 long tickDuration, TimeUnit unit, int ticksPerWheel) {
195 this(threadFactory, tickDuration, unit, ticksPerWheel, true);
196 }
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213 public HashedWheelTimer(
214 ThreadFactory threadFactory,
215 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
216 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
217 }
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239 public HashedWheelTimer(
240 ThreadFactory threadFactory,
241 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
242 long maxPendingTimeouts) {
243
244 if (threadFactory == null) {
245 throw new NullPointerException("threadFactory");
246 }
247 if (unit == null) {
248 throw new NullPointerException("unit");
249 }
250 if (tickDuration <= 0) {
251 throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
252 }
253 if (ticksPerWheel <= 0) {
254 throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
255 }
256
257
258 wheel = createWheel(ticksPerWheel);
259 mask = wheel.length - 1;
260
261
262 this.tickDuration = unit.toNanos(tickDuration);
263
264
265 if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
266 throw new IllegalArgumentException(String.format(
267 "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
268 tickDuration, Long.MAX_VALUE / wheel.length));
269 }
270 workerThread = threadFactory.newThread(worker);
271
272 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
273
274 this.maxPendingTimeouts = maxPendingTimeouts;
275
276 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
277 WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
278 reportTooManyInstances();
279 }
280 }
281
282 @Override
283 protected void finalize() throws Throwable {
284 try {
285 super.finalize();
286 } finally {
287
288
289 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
290 INSTANCE_COUNTER.decrementAndGet();
291 }
292 }
293 }
294
295 private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
296 if (ticksPerWheel <= 0) {
297 throw new IllegalArgumentException(
298 "ticksPerWheel must be greater than 0: " + ticksPerWheel);
299 }
300 if (ticksPerWheel > 1073741824) {
301 throw new IllegalArgumentException(
302 "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
303 }
304
305 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
306 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
307 for (int i = 0; i < wheel.length; i ++) {
308 wheel[i] = new HashedWheelBucket();
309 }
310 return wheel;
311 }
312
313 private static int normalizeTicksPerWheel(int ticksPerWheel) {
314 int normalizedTicksPerWheel = 1;
315 while (normalizedTicksPerWheel < ticksPerWheel) {
316 normalizedTicksPerWheel <<= 1;
317 }
318 return normalizedTicksPerWheel;
319 }
320
321
322
323
324
325
326
327
328 public void start() {
329 switch (WORKER_STATE_UPDATER.get(this)) {
330 case WORKER_STATE_INIT:
331 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
332 workerThread.start();
333 }
334 break;
335 case WORKER_STATE_STARTED:
336 break;
337 case WORKER_STATE_SHUTDOWN:
338 throw new IllegalStateException("cannot be started once stopped");
339 default:
340 throw new Error("Invalid WorkerState");
341 }
342
343
344 while (startTime == 0) {
345 try {
346 startTimeInitialized.await();
347 } catch (InterruptedException ignore) {
348
349 }
350 }
351 }
352
353 @Override
354 public Set<Timeout> stop() {
355 if (Thread.currentThread() == workerThread) {
356 throw new IllegalStateException(
357 HashedWheelTimer.class.getSimpleName() +
358 ".stop() cannot be called from " +
359 TimerTask.class.getSimpleName());
360 }
361
362 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
363
364 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
365 INSTANCE_COUNTER.decrementAndGet();
366 if (leak != null) {
367 boolean closed = leak.close(this);
368 assert closed;
369 }
370 }
371
372 return Collections.emptySet();
373 }
374
375 try {
376 boolean interrupted = false;
377 while (workerThread.isAlive()) {
378 workerThread.interrupt();
379 try {
380 workerThread.join(100);
381 } catch (InterruptedException ignored) {
382 interrupted = true;
383 }
384 }
385
386 if (interrupted) {
387 Thread.currentThread().interrupt();
388 }
389 } finally {
390 INSTANCE_COUNTER.decrementAndGet();
391 if (leak != null) {
392 boolean closed = leak.close(this);
393 assert closed;
394 }
395 }
396 return worker.unprocessedTimeouts();
397 }
398
399 @Override
400 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
401 if (task == null) {
402 throw new NullPointerException("task");
403 }
404 if (unit == null) {
405 throw new NullPointerException("unit");
406 }
407
408 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
409
410 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
411 pendingTimeouts.decrementAndGet();
412 throw new RejectedExecutionException("Number of pending timeouts ("
413 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
414 + "timeouts (" + maxPendingTimeouts + ")");
415 }
416
417 start();
418
419
420
421 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
422 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
423 timeouts.add(timeout);
424 return timeout;
425 }
426
427
428
429
430 public long pendingTimeouts() {
431 return pendingTimeouts.get();
432 }
433
434 private static void reportTooManyInstances() {
435 String resourceType = simpleClassName(HashedWheelTimer.class);
436 logger.error("You are creating too many " + resourceType + " instances. " +
437 resourceType + " is a shared resource that must be reused across the JVM," +
438 "so that only a few instances are created.");
439 }
440
441 private final class Worker implements Runnable {
442 private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
443
444 private long tick;
445
446 @Override
447 public void run() {
448
449 startTime = System.nanoTime();
450 if (startTime == 0) {
451
452 startTime = 1;
453 }
454
455
456 startTimeInitialized.countDown();
457
458 do {
459 final long deadline = waitForNextTick();
460 if (deadline > 0) {
461 int idx = (int) (tick & mask);
462 processCancelledTasks();
463 HashedWheelBucket bucket =
464 wheel[idx];
465 transferTimeoutsToBuckets();
466 bucket.expireTimeouts(deadline);
467 tick++;
468 }
469 } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
470
471
472 for (HashedWheelBucket bucket: wheel) {
473 bucket.clearTimeouts(unprocessedTimeouts);
474 }
475 for (;;) {
476 HashedWheelTimeout timeout = timeouts.poll();
477 if (timeout == null) {
478 break;
479 }
480 if (!timeout.isCancelled()) {
481 unprocessedTimeouts.add(timeout);
482 }
483 }
484 processCancelledTasks();
485 }
486
487 private void transferTimeoutsToBuckets() {
488
489
490 for (int i = 0; i < 100000; i++) {
491 HashedWheelTimeout timeout = timeouts.poll();
492 if (timeout == null) {
493
494 break;
495 }
496 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
497
498 continue;
499 }
500
501 long calculated = timeout.deadline / tickDuration;
502 timeout.remainingRounds = (calculated - tick) / wheel.length;
503
504 final long ticks = Math.max(calculated, tick);
505 int stopIndex = (int) (ticks & mask);
506
507 HashedWheelBucket bucket = wheel[stopIndex];
508 bucket.addTimeout(timeout);
509 }
510 }
511
512 private void processCancelledTasks() {
513 for (;;) {
514 HashedWheelTimeout timeout = cancelledTimeouts.poll();
515 if (timeout == null) {
516
517 break;
518 }
519 try {
520 timeout.remove();
521 } catch (Throwable t) {
522 if (logger.isWarnEnabled()) {
523 logger.warn("An exception was thrown while process a cancellation task", t);
524 }
525 }
526 }
527 }
528
529
530
531
532
533
534
535 private long waitForNextTick() {
536 long deadline = tickDuration * (tick + 1);
537
538 for (;;) {
539 final long currentTime = System.nanoTime() - startTime;
540 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
541
542 if (sleepTimeMs <= 0) {
543 if (currentTime == Long.MIN_VALUE) {
544 return -Long.MAX_VALUE;
545 } else {
546 return currentTime;
547 }
548 }
549
550
551
552
553
554
555 if (PlatformDependent.isWindows()) {
556 sleepTimeMs = sleepTimeMs / 10 * 10;
557 }
558
559 try {
560 Thread.sleep(sleepTimeMs);
561 } catch (InterruptedException ignored) {
562 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
563 return Long.MIN_VALUE;
564 }
565 }
566 }
567 }
568
569 public Set<Timeout> unprocessedTimeouts() {
570 return Collections.unmodifiableSet(unprocessedTimeouts);
571 }
572 }
573
574 private static final class HashedWheelTimeout implements Timeout {
575
576 private static final int ST_INIT = 0;
577 private static final int ST_CANCELLED = 1;
578 private static final int ST_EXPIRED = 2;
579 private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
580 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
581
582 private final HashedWheelTimer timer;
583 private final TimerTask task;
584 private final long deadline;
585
586 @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
587 private volatile int state = ST_INIT;
588
589
590
591 long remainingRounds;
592
593
594
595 HashedWheelTimeout next;
596 HashedWheelTimeout prev;
597
598
599 HashedWheelBucket bucket;
600
601 HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
602 this.timer = timer;
603 this.task = task;
604 this.deadline = deadline;
605 }
606
607 @Override
608 public Timer timer() {
609 return timer;
610 }
611
612 @Override
613 public TimerTask task() {
614 return task;
615 }
616
617 @Override
618 public boolean cancel() {
619
620 if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
621 return false;
622 }
623
624
625
626 timer.cancelledTimeouts.add(this);
627 return true;
628 }
629
630 void remove() {
631 HashedWheelBucket bucket = this.bucket;
632 if (bucket != null) {
633 bucket.remove(this);
634 } else {
635 timer.pendingTimeouts.decrementAndGet();
636 }
637 }
638
639 public boolean compareAndSetState(int expected, int state) {
640 return STATE_UPDATER.compareAndSet(this, expected, state);
641 }
642
643 public int state() {
644 return state;
645 }
646
647 @Override
648 public boolean isCancelled() {
649 return state() == ST_CANCELLED;
650 }
651
652 @Override
653 public boolean isExpired() {
654 return state() == ST_EXPIRED;
655 }
656
657 public void expire() {
658 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
659 return;
660 }
661
662 try {
663 task.run(this);
664 } catch (Throwable t) {
665 if (logger.isWarnEnabled()) {
666 logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
667 }
668 }
669 }
670
671 @Override
672 public String toString() {
673 final long currentTime = System.nanoTime();
674 long remaining = deadline - currentTime + timer.startTime;
675
676 StringBuilder buf = new StringBuilder(192)
677 .append(simpleClassName(this))
678 .append('(')
679 .append("deadline: ");
680 if (remaining > 0) {
681 buf.append(remaining)
682 .append(" ns later");
683 } else if (remaining < 0) {
684 buf.append(-remaining)
685 .append(" ns ago");
686 } else {
687 buf.append("now");
688 }
689
690 if (isCancelled()) {
691 buf.append(", cancelled");
692 }
693
694 return buf.append(", task: ")
695 .append(task())
696 .append(')')
697 .toString();
698 }
699 }
700
701
702
703
704
705
706 private static final class HashedWheelBucket {
707
708 private HashedWheelTimeout head;
709 private HashedWheelTimeout tail;
710
711
712
713
714 public void addTimeout(HashedWheelTimeout timeout) {
715 assert timeout.bucket == null;
716 timeout.bucket = this;
717 if (head == null) {
718 head = tail = timeout;
719 } else {
720 tail.next = timeout;
721 timeout.prev = tail;
722 tail = timeout;
723 }
724 }
725
726
727
728
729 public void expireTimeouts(long deadline) {
730 HashedWheelTimeout timeout = head;
731
732
733 while (timeout != null) {
734 HashedWheelTimeout next = timeout.next;
735 if (timeout.remainingRounds <= 0) {
736 next = remove(timeout);
737 if (timeout.deadline <= deadline) {
738 timeout.expire();
739 } else {
740
741 throw new IllegalStateException(String.format(
742 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
743 }
744 } else if (timeout.isCancelled()) {
745 next = remove(timeout);
746 } else {
747 timeout.remainingRounds --;
748 }
749 timeout = next;
750 }
751 }
752
753 public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
754 HashedWheelTimeout next = timeout.next;
755
756 if (timeout.prev != null) {
757 timeout.prev.next = next;
758 }
759 if (timeout.next != null) {
760 timeout.next.prev = timeout.prev;
761 }
762
763 if (timeout == head) {
764
765 if (timeout == tail) {
766 tail = null;
767 head = null;
768 } else {
769 head = next;
770 }
771 } else if (timeout == tail) {
772
773 tail = timeout.prev;
774 }
775
776 timeout.prev = null;
777 timeout.next = null;
778 timeout.bucket = null;
779 timeout.timer.pendingTimeouts.decrementAndGet();
780 return next;
781 }
782
783
784
785
786 public void clearTimeouts(Set<Timeout> set) {
787 for (;;) {
788 HashedWheelTimeout timeout = pollTimeout();
789 if (timeout == null) {
790 return;
791 }
792 if (timeout.isExpired() || timeout.isCancelled()) {
793 continue;
794 }
795 set.add(timeout);
796 }
797 }
798
799 private HashedWheelTimeout pollTimeout() {
800 HashedWheelTimeout head = this.head;
801 if (head == null) {
802 return null;
803 }
804 HashedWheelTimeout next = head.next;
805 if (next == null) {
806 tail = this.head = null;
807 } else {
808 this.head = next;
809 next.prev = null;
810 }
811
812
813 head.next = null;
814 head.prev = null;
815 head.bucket = null;
816 return head;
817 }
818 }
819 }