1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util;
17
18 import io.netty.util.internal.PlatformDependent;
19 import io.netty.util.internal.logging.InternalLogger;
20 import io.netty.util.internal.logging.InternalLoggerFactory;
21
22 import java.util.Collections;
23 import java.util.HashSet;
24 import java.util.Queue;
25 import java.util.Set;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
34 import java.util.concurrent.atomic.AtomicLong;
35
36 import static io.netty.util.internal.StringUtil.simpleClassName;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 public class HashedWheelTimer implements Timer {
80
81 static final InternalLogger logger =
82 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
83
84 private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
85 private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
86 private static final int INSTANCE_COUNT_LIMIT = 64;
87 private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
88 .newResourceLeakDetector(HashedWheelTimer.class, 1);
89
90 private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
91 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
92
93 private final ResourceLeakTracker<HashedWheelTimer> leak;
94 private final Worker worker = new Worker();
95 private final Thread workerThread;
96
97 public static final int WORKER_STATE_INIT = 0;
98 public static final int WORKER_STATE_STARTED = 1;
99 public static final int WORKER_STATE_SHUTDOWN = 2;
100 @SuppressWarnings({ "unused", "FieldMayBeFinal" })
101 private volatile int workerState;
102
103 private final long tickDuration;
104 private final HashedWheelBucket[] wheel;
105 private final int mask;
106 private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
107 private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
108 private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
109 private final AtomicLong pendingTimeouts = new AtomicLong(0);
110 private final long maxPendingTimeouts;
111
112 private volatile long startTime;
113
114
115
116
117
118
119 public HashedWheelTimer() {
120 this(Executors.defaultThreadFactory());
121 }
122
123
124
125
126
127
128
129
130
131
132
133 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
134 this(Executors.defaultThreadFactory(), tickDuration, unit);
135 }
136
137
138
139
140
141
142
143
144
145
146
147 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
148 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
149 }
150
151
152
153
154
155
156
157
158
159
160 public HashedWheelTimer(ThreadFactory threadFactory) {
161 this(threadFactory, 100, TimeUnit.MILLISECONDS);
162 }
163
164
165
166
167
168
169
170
171
172
173
174
175 public HashedWheelTimer(
176 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
177 this(threadFactory, tickDuration, unit, 512);
178 }
179
180
181
182
183
184
185
186
187
188
189
190
191
192 public HashedWheelTimer(
193 ThreadFactory threadFactory,
194 long tickDuration, TimeUnit unit, int ticksPerWheel) {
195 this(threadFactory, tickDuration, unit, ticksPerWheel, true);
196 }
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213 public HashedWheelTimer(
214 ThreadFactory threadFactory,
215 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
216 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
217 }
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239 public HashedWheelTimer(
240 ThreadFactory threadFactory,
241 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
242 long maxPendingTimeouts) {
243
244 if (threadFactory == null) {
245 throw new NullPointerException("threadFactory");
246 }
247 if (unit == null) {
248 throw new NullPointerException("unit");
249 }
250 if (tickDuration <= 0) {
251 throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
252 }
253 if (ticksPerWheel <= 0) {
254 throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
255 }
256
257
258 wheel = createWheel(ticksPerWheel);
259 mask = wheel.length - 1;
260
261
262 this.tickDuration = unit.toNanos(tickDuration);
263
264
265 if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
266 throw new IllegalArgumentException(String.format(
267 "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
268 tickDuration, Long.MAX_VALUE / wheel.length));
269 }
270 workerThread = threadFactory.newThread(worker);
271
272 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
273
274 this.maxPendingTimeouts = maxPendingTimeouts;
275
276 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
277 WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
278 reportTooManyInstances();
279 }
280 }
281
282 @Override
283 protected void finalize() throws Throwable {
284 try {
285 super.finalize();
286 } finally {
287
288
289 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
290 INSTANCE_COUNTER.decrementAndGet();
291 }
292 }
293 }
294
295 private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
296 if (ticksPerWheel <= 0) {
297 throw new IllegalArgumentException(
298 "ticksPerWheel must be greater than 0: " + ticksPerWheel);
299 }
300 if (ticksPerWheel > 1073741824) {
301 throw new IllegalArgumentException(
302 "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
303 }
304
305 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
306 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
307 for (int i = 0; i < wheel.length; i ++) {
308 wheel[i] = new HashedWheelBucket();
309 }
310 return wheel;
311 }
312
313 private static int normalizeTicksPerWheel(int ticksPerWheel) {
314 int normalizedTicksPerWheel = 1;
315 while (normalizedTicksPerWheel < ticksPerWheel) {
316 normalizedTicksPerWheel <<= 1;
317 }
318 return normalizedTicksPerWheel;
319 }
320
321
322
323
324
325
326
327
328 public void start() {
329 switch (WORKER_STATE_UPDATER.get(this)) {
330 case WORKER_STATE_INIT:
331 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
332 workerThread.start();
333 }
334 break;
335 case WORKER_STATE_STARTED:
336 break;
337 case WORKER_STATE_SHUTDOWN:
338 throw new IllegalStateException("cannot be started once stopped");
339 default:
340 throw new Error("Invalid WorkerState");
341 }
342
343
344 while (startTime == 0) {
345 try {
346 startTimeInitialized.await();
347 } catch (InterruptedException ignore) {
348
349 }
350 }
351 }
352
353 @Override
354 public Set<Timeout> stop() {
355 if (Thread.currentThread() == workerThread) {
356 throw new IllegalStateException(
357 HashedWheelTimer.class.getSimpleName() +
358 ".stop() cannot be called from " +
359 TimerTask.class.getSimpleName());
360 }
361
362 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
363
364 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
365 INSTANCE_COUNTER.decrementAndGet();
366 if (leak != null) {
367 boolean closed = leak.close(this);
368 assert closed;
369 }
370 }
371
372 return Collections.emptySet();
373 }
374
375 try {
376 boolean interrupted = false;
377 while (workerThread.isAlive()) {
378 workerThread.interrupt();
379 try {
380 workerThread.join(100);
381 } catch (InterruptedException ignored) {
382 interrupted = true;
383 }
384 }
385
386 if (interrupted) {
387 Thread.currentThread().interrupt();
388 }
389 } finally {
390 INSTANCE_COUNTER.decrementAndGet();
391 if (leak != null) {
392 boolean closed = leak.close(this);
393 assert closed;
394 }
395 }
396 return worker.unprocessedTimeouts();
397 }
398
399 @Override
400 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
401 if (task == null) {
402 throw new NullPointerException("task");
403 }
404 if (unit == null) {
405 throw new NullPointerException("unit");
406 }
407
408 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
409
410 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
411 pendingTimeouts.decrementAndGet();
412 throw new RejectedExecutionException("Number of pending timeouts ("
413 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
414 + "timeouts (" + maxPendingTimeouts + ")");
415 }
416
417 start();
418
419
420
421 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
422
423
424 if (delay > 0 && deadline < 0) {
425 deadline = Long.MAX_VALUE;
426 }
427 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
428 timeouts.add(timeout);
429 return timeout;
430 }
431
432
433
434
435 public long pendingTimeouts() {
436 return pendingTimeouts.get();
437 }
438
439 private static void reportTooManyInstances() {
440 String resourceType = simpleClassName(HashedWheelTimer.class);
441 logger.error("You are creating too many " + resourceType + " instances. " +
442 resourceType + " is a shared resource that must be reused across the JVM," +
443 "so that only a few instances are created.");
444 }
445
446 private final class Worker implements Runnable {
447 private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
448
449 private long tick;
450
451 @Override
452 public void run() {
453
454 startTime = System.nanoTime();
455 if (startTime == 0) {
456
457 startTime = 1;
458 }
459
460
461 startTimeInitialized.countDown();
462
463 do {
464 final long deadline = waitForNextTick();
465 if (deadline > 0) {
466 int idx = (int) (tick & mask);
467 processCancelledTasks();
468 HashedWheelBucket bucket =
469 wheel[idx];
470 transferTimeoutsToBuckets();
471 bucket.expireTimeouts(deadline);
472 tick++;
473 }
474 } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
475
476
477 for (HashedWheelBucket bucket: wheel) {
478 bucket.clearTimeouts(unprocessedTimeouts);
479 }
480 for (;;) {
481 HashedWheelTimeout timeout = timeouts.poll();
482 if (timeout == null) {
483 break;
484 }
485 if (!timeout.isCancelled()) {
486 unprocessedTimeouts.add(timeout);
487 }
488 }
489 processCancelledTasks();
490 }
491
492 private void transferTimeoutsToBuckets() {
493
494
495 for (int i = 0; i < 100000; i++) {
496 HashedWheelTimeout timeout = timeouts.poll();
497 if (timeout == null) {
498
499 break;
500 }
501 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
502
503 continue;
504 }
505
506 long calculated = timeout.deadline / tickDuration;
507 timeout.remainingRounds = (calculated - tick) / wheel.length;
508
509 final long ticks = Math.max(calculated, tick);
510 int stopIndex = (int) (ticks & mask);
511
512 HashedWheelBucket bucket = wheel[stopIndex];
513 bucket.addTimeout(timeout);
514 }
515 }
516
517 private void processCancelledTasks() {
518 for (;;) {
519 HashedWheelTimeout timeout = cancelledTimeouts.poll();
520 if (timeout == null) {
521
522 break;
523 }
524 try {
525 timeout.remove();
526 } catch (Throwable t) {
527 if (logger.isWarnEnabled()) {
528 logger.warn("An exception was thrown while process a cancellation task", t);
529 }
530 }
531 }
532 }
533
534
535
536
537
538
539
540 private long waitForNextTick() {
541 long deadline = tickDuration * (tick + 1);
542
543 for (;;) {
544 final long currentTime = System.nanoTime() - startTime;
545 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
546
547 if (sleepTimeMs <= 0) {
548 if (currentTime == Long.MIN_VALUE) {
549 return -Long.MAX_VALUE;
550 } else {
551 return currentTime;
552 }
553 }
554
555
556
557
558
559
560 if (PlatformDependent.isWindows()) {
561 sleepTimeMs = sleepTimeMs / 10 * 10;
562 }
563
564 try {
565 Thread.sleep(sleepTimeMs);
566 } catch (InterruptedException ignored) {
567 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
568 return Long.MIN_VALUE;
569 }
570 }
571 }
572 }
573
574 public Set<Timeout> unprocessedTimeouts() {
575 return Collections.unmodifiableSet(unprocessedTimeouts);
576 }
577 }
578
579 private static final class HashedWheelTimeout implements Timeout {
580
581 private static final int ST_INIT = 0;
582 private static final int ST_CANCELLED = 1;
583 private static final int ST_EXPIRED = 2;
584 private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
585 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
586
587 private final HashedWheelTimer timer;
588 private final TimerTask task;
589 private final long deadline;
590
591 @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
592 private volatile int state = ST_INIT;
593
594
595
596 long remainingRounds;
597
598
599
600 HashedWheelTimeout next;
601 HashedWheelTimeout prev;
602
603
604 HashedWheelBucket bucket;
605
606 HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
607 this.timer = timer;
608 this.task = task;
609 this.deadline = deadline;
610 }
611
612 @Override
613 public Timer timer() {
614 return timer;
615 }
616
617 @Override
618 public TimerTask task() {
619 return task;
620 }
621
622 @Override
623 public boolean cancel() {
624
625 if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
626 return false;
627 }
628
629
630
631 timer.cancelledTimeouts.add(this);
632 return true;
633 }
634
635 void remove() {
636 HashedWheelBucket bucket = this.bucket;
637 if (bucket != null) {
638 bucket.remove(this);
639 } else {
640 timer.pendingTimeouts.decrementAndGet();
641 }
642 }
643
644 public boolean compareAndSetState(int expected, int state) {
645 return STATE_UPDATER.compareAndSet(this, expected, state);
646 }
647
648 public int state() {
649 return state;
650 }
651
652 @Override
653 public boolean isCancelled() {
654 return state() == ST_CANCELLED;
655 }
656
657 @Override
658 public boolean isExpired() {
659 return state() == ST_EXPIRED;
660 }
661
662 public void expire() {
663 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
664 return;
665 }
666
667 try {
668 task.run(this);
669 } catch (Throwable t) {
670 if (logger.isWarnEnabled()) {
671 logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
672 }
673 }
674 }
675
676 @Override
677 public String toString() {
678 final long currentTime = System.nanoTime();
679 long remaining = deadline - currentTime + timer.startTime;
680
681 StringBuilder buf = new StringBuilder(192)
682 .append(simpleClassName(this))
683 .append('(')
684 .append("deadline: ");
685 if (remaining > 0) {
686 buf.append(remaining)
687 .append(" ns later");
688 } else if (remaining < 0) {
689 buf.append(-remaining)
690 .append(" ns ago");
691 } else {
692 buf.append("now");
693 }
694
695 if (isCancelled()) {
696 buf.append(", cancelled");
697 }
698
699 return buf.append(", task: ")
700 .append(task())
701 .append(')')
702 .toString();
703 }
704 }
705
706
707
708
709
710
711 private static final class HashedWheelBucket {
712
713 private HashedWheelTimeout head;
714 private HashedWheelTimeout tail;
715
716
717
718
719 public void addTimeout(HashedWheelTimeout timeout) {
720 assert timeout.bucket == null;
721 timeout.bucket = this;
722 if (head == null) {
723 head = tail = timeout;
724 } else {
725 tail.next = timeout;
726 timeout.prev = tail;
727 tail = timeout;
728 }
729 }
730
731
732
733
734 public void expireTimeouts(long deadline) {
735 HashedWheelTimeout timeout = head;
736
737
738 while (timeout != null) {
739 HashedWheelTimeout next = timeout.next;
740 if (timeout.remainingRounds <= 0) {
741 next = remove(timeout);
742 if (timeout.deadline <= deadline) {
743 timeout.expire();
744 } else {
745
746 throw new IllegalStateException(String.format(
747 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
748 }
749 } else if (timeout.isCancelled()) {
750 next = remove(timeout);
751 } else {
752 timeout.remainingRounds --;
753 }
754 timeout = next;
755 }
756 }
757
758 public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
759 HashedWheelTimeout next = timeout.next;
760
761 if (timeout.prev != null) {
762 timeout.prev.next = next;
763 }
764 if (timeout.next != null) {
765 timeout.next.prev = timeout.prev;
766 }
767
768 if (timeout == head) {
769
770 if (timeout == tail) {
771 tail = null;
772 head = null;
773 } else {
774 head = next;
775 }
776 } else if (timeout == tail) {
777
778 tail = timeout.prev;
779 }
780
781 timeout.prev = null;
782 timeout.next = null;
783 timeout.bucket = null;
784 timeout.timer.pendingTimeouts.decrementAndGet();
785 return next;
786 }
787
788
789
790
791 public void clearTimeouts(Set<Timeout> set) {
792 for (;;) {
793 HashedWheelTimeout timeout = pollTimeout();
794 if (timeout == null) {
795 return;
796 }
797 if (timeout.isExpired() || timeout.isCancelled()) {
798 continue;
799 }
800 set.add(timeout);
801 }
802 }
803
804 private HashedWheelTimeout pollTimeout() {
805 HashedWheelTimeout head = this.head;
806 if (head == null) {
807 return null;
808 }
809 HashedWheelTimeout next = head.next;
810 if (next == null) {
811 tail = this.head = null;
812 } else {
813 this.head = next;
814 next.prev = null;
815 }
816
817
818 head.next = null;
819 head.prev = null;
820 head.bucket = null;
821 return head;
822 }
823 }
824 }