1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.util;
17
18 import org.jboss.netty.channel.ChannelPipelineFactory;
19 import org.jboss.netty.logging.InternalLogger;
20 import org.jboss.netty.logging.InternalLoggerFactory;
21 import org.jboss.netty.util.internal.DetectionUtil;
22 import org.jboss.netty.util.internal.SharedResourceMisuseDetector;
23
24 import java.util.Collections;
25 import java.util.HashSet;
26 import java.util.Queue;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 public class HashedWheelTimer implements Timer {
80
81 static final InternalLogger logger =
82 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
83 private static final AtomicInteger id = new AtomicInteger();
84
85 private static final SharedResourceMisuseDetector misuseDetector =
86 new SharedResourceMisuseDetector(HashedWheelTimer.class);
87
88 private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
89 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
90
91 private final Worker worker = new Worker();
92 private final Thread workerThread;
93
94 public static final int WORKER_STATE_INIT = 0;
95 public static final int WORKER_STATE_STARTED = 1;
96 public static final int WORKER_STATE_SHUTDOWN = 2;
97 @SuppressWarnings({ "unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
98 private volatile int workerState = WORKER_STATE_INIT;
99
100 private final long tickDuration;
101 private final HashedWheelBucket[] wheel;
102 private final int mask;
103 private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
104 private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<HashedWheelTimeout>();
105 private volatile long startTime;
106
107
108
109
110
111
112 public HashedWheelTimer() {
113 this(Executors.defaultThreadFactory());
114 }
115
116
117
118
119
120
121
122
123
124 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
125 this(Executors.defaultThreadFactory(), tickDuration, unit);
126 }
127
128
129
130
131
132
133
134
135
136 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
137 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
138 }
139
140
141
142
143
144
145
146
147
148 public HashedWheelTimer(ThreadFactory threadFactory) {
149 this(threadFactory, 100, TimeUnit.MILLISECONDS);
150 }
151
152
153
154
155
156
157
158
159
160
161 public HashedWheelTimer(
162 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
163 this(threadFactory, tickDuration, unit, 512);
164 }
165
166
167
168
169
170
171
172
173
174
175
176 public HashedWheelTimer(
177 ThreadFactory threadFactory,
178 long tickDuration, TimeUnit unit, int ticksPerWheel) {
179 this(threadFactory, null, tickDuration, unit, ticksPerWheel);
180 }
181
182
183
184
185
186
187
188
189
190
191
192
193 public HashedWheelTimer(
194 ThreadFactory threadFactory,
195 ThreadNameDeterminer determiner,
196 long tickDuration, TimeUnit unit, int ticksPerWheel) {
197
198 if (threadFactory == null) {
199 throw new NullPointerException("threadFactory");
200 }
201 if (unit == null) {
202 throw new NullPointerException("unit");
203 }
204 if (tickDuration <= 0) {
205 throw new IllegalArgumentException(
206 "tickDuration must be greater than 0: " + tickDuration);
207 }
208 if (ticksPerWheel <= 0) {
209 throw new IllegalArgumentException(
210 "ticksPerWheel must be greater than 0: " + ticksPerWheel);
211 }
212
213
214 wheel = createWheel(ticksPerWheel);
215 mask = wheel.length - 1;
216
217
218 this.tickDuration = unit.toNanos(tickDuration);
219
220
221 if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
222 throw new IllegalArgumentException(String.format(
223 "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
224 tickDuration, Long.MAX_VALUE / wheel.length));
225 }
226
227 workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
228 worker, "Hashed wheel timer #" + id.incrementAndGet(),
229 determiner));
230
231
232 misuseDetector.increase();
233 }
234
235 @SuppressWarnings("unchecked")
236 private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
237 if (ticksPerWheel <= 0) {
238 throw new IllegalArgumentException(
239 "ticksPerWheel must be greater than 0: " + ticksPerWheel);
240 }
241 if (ticksPerWheel > 1073741824) {
242 throw new IllegalArgumentException(
243 "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
244 }
245
246 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
247 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
248 for (int i = 0; i < wheel.length; i ++) {
249 wheel[i] = new HashedWheelBucket();
250 }
251 return wheel;
252 }
253
254 private static int normalizeTicksPerWheel(int ticksPerWheel) {
255 int normalizedTicksPerWheel = 1;
256 while (normalizedTicksPerWheel < ticksPerWheel) {
257 normalizedTicksPerWheel <<= 1;
258 }
259 return normalizedTicksPerWheel;
260 }
261
262
263
264
265
266
267
268
269 public void start() {
270 switch (WORKER_STATE_UPDATER.get(this)) {
271 case WORKER_STATE_INIT:
272 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
273 workerThread.start();
274 }
275 break;
276 case WORKER_STATE_STARTED:
277 break;
278 case WORKER_STATE_SHUTDOWN:
279 throw new IllegalStateException("cannot be started once stopped");
280 default:
281 throw new Error("Invalid WorkerState");
282 }
283
284
285 while (startTime == 0) {
286 try {
287 startTimeInitialized.await();
288 } catch (InterruptedException ignore) {
289
290 }
291 }
292 }
293
294 public Set<Timeout> stop() {
295 if (Thread.currentThread() == workerThread) {
296 throw new IllegalStateException(
297 HashedWheelTimer.class.getSimpleName() +
298 ".stop() cannot be called from " +
299 TimerTask.class.getSimpleName());
300 }
301
302 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
303
304 WORKER_STATE_UPDATER.set(this, WORKER_STATE_SHUTDOWN);
305
306 misuseDetector.decrease();
307
308 return Collections.emptySet();
309 }
310
311 boolean interrupted = false;
312 while (workerThread.isAlive()) {
313 workerThread.interrupt();
314 try {
315 workerThread.join(100);
316 } catch (InterruptedException e) {
317 interrupted = true;
318 }
319 }
320
321 if (interrupted) {
322 Thread.currentThread().interrupt();
323 }
324
325 misuseDetector.decrease();
326
327 return worker.unprocessedTimeouts();
328 }
329
330 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
331 if (task == null) {
332 throw new NullPointerException("task");
333 }
334 if (unit == null) {
335 throw new NullPointerException("unit");
336 }
337 start();
338
339
340
341 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
342 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
343 timeouts.add(timeout);
344 return timeout;
345 }
346
347 private final class Worker implements Runnable {
348 private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
349
350 private long tick;
351
352 public void run() {
353
354 startTime = System.nanoTime();
355 if (startTime == 0) {
356
357 startTime = 1;
358 }
359
360
361 startTimeInitialized.countDown();
362
363 do {
364 final long deadline = waitForNextTick();
365 if (deadline > 0) {
366 transferTimeoutsToBuckets();
367 HashedWheelBucket bucket =
368 wheel[(int) (tick & mask)];
369 bucket.expireTimeouts(deadline);
370 tick++;
371 }
372 } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
373
374
375 for (HashedWheelBucket bucket: wheel) {
376 bucket.clearTimeouts(unprocessedTimeouts);
377 }
378 for (;;) {
379 HashedWheelTimeout timeout = timeouts.poll();
380 if (timeout == null) {
381 break;
382 }
383 unprocessedTimeouts.add(timeout);
384 }
385 }
386
387 private void transferTimeoutsToBuckets() {
388
389
390 for (int i = 0; i < 100000; i++) {
391 HashedWheelTimeout timeout = timeouts.poll();
392 if (timeout == null) {
393
394 break;
395 }
396 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED
397 || !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) {
398
399
400 timeout.remove();
401 continue;
402 }
403 long calculated = timeout.deadline / tickDuration;
404 long remainingRounds = (calculated - tick) / wheel.length;
405 timeout.remainingRounds = remainingRounds;
406
407 final long ticks = Math.max(calculated, tick);
408 int stopIndex = (int) (ticks & mask);
409
410 HashedWheelBucket bucket = wheel[stopIndex];
411 bucket.addTimeout(timeout);
412 }
413 }
414
415
416
417
418
419
420 private long waitForNextTick() {
421 long deadline = tickDuration * (tick + 1);
422
423 for (;;) {
424 final long currentTime = System.nanoTime() - startTime;
425 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
426
427 if (sleepTimeMs <= 0) {
428 if (currentTime == Long.MIN_VALUE) {
429 return -Long.MAX_VALUE;
430 } else {
431 return currentTime;
432 }
433 }
434
435
436
437
438
439
440 if (DetectionUtil.isWindows()) {
441 sleepTimeMs = sleepTimeMs / 10 * 10;
442 }
443
444 try {
445 Thread.sleep(sleepTimeMs);
446 } catch (InterruptedException e) {
447 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
448 return Long.MIN_VALUE;
449 }
450 }
451 }
452 }
453
454 public Set<Timeout> unprocessedTimeouts() {
455 return Collections.unmodifiableSet(unprocessedTimeouts);
456 }
457 }
458
459 private static final class HashedWheelTimeout implements Timeout {
460
461 private static final int ST_INIT = 0;
462 private static final int ST_IN_BUCKET = 1;
463 private static final int ST_CANCELLED = 2;
464 private static final int ST_EXPIRED = 3;
465 private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
466 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
467
468 private final HashedWheelTimer timer;
469 private final TimerTask task;
470 private final long deadline;
471
472 @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
473 private volatile int state = ST_INIT;
474
475
476
477 long remainingRounds;
478
479
480
481 HashedWheelTimeout next;
482 HashedWheelTimeout prev;
483
484
485 HashedWheelBucket bucket;
486
487 HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
488 this.timer = timer;
489 this.task = task;
490 this.deadline = deadline;
491 }
492
493 public Timer getTimer() {
494 return timer;
495 }
496
497 public TimerTask getTask() {
498 return task;
499 }
500
501 public void cancel() {
502 int state = state();
503 if (state >= ST_CANCELLED) {
504
505 return;
506 }
507 if (state != ST_IN_BUCKET && compareAndSetState(ST_INIT, ST_CANCELLED)) {
508
509
510
511 return;
512 }
513
514 if (!compareAndSetState(ST_IN_BUCKET, ST_CANCELLED)) {
515 return;
516 }
517
518
519
520 timer.timeouts.add(this);
521 }
522
523 public void remove() {
524 if (bucket != null) {
525 bucket.remove(this);
526 }
527 }
528
529 public boolean compareAndSetState(int expected, int state) {
530 return STATE_UPDATER.compareAndSet(this, expected, state);
531 }
532
533 public int state() {
534 return state;
535 }
536
537 public boolean isCancelled() {
538 return state == ST_CANCELLED;
539 }
540
541 public boolean isExpired() {
542 return state > ST_IN_BUCKET;
543 }
544
545 public HashedWheelTimeout value() {
546 return this;
547 }
548
549 public void expire() {
550 if (!compareAndSetState(ST_IN_BUCKET, ST_EXPIRED)) {
551 assert state() != ST_INIT;
552 return;
553 }
554
555 try {
556 task.run(this);
557 } catch (Throwable t) {
558 if (logger.isWarnEnabled()) {
559 logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
560 }
561 }
562 }
563
564 @Override
565 public String toString() {
566 final long currentTime = System.nanoTime();
567 long remaining = deadline - currentTime + timer.startTime;
568
569 StringBuilder buf = new StringBuilder(192);
570 buf.append(getClass().getSimpleName());
571 buf.append('(');
572
573 buf.append("deadline: ");
574 if (remaining > 0) {
575 buf.append(remaining);
576 buf.append(" ns later");
577 } else if (remaining < 0) {
578 buf.append(-remaining);
579 buf.append(" ns ago");
580 } else {
581 buf.append("now");
582 }
583
584 if (isCancelled()) {
585 buf.append(", cancelled");
586 }
587
588 buf.append(", task: ");
589 buf.append(getTask());
590
591 return buf.append(')').toString();
592 }
593 }
594
595
596
597
598
599
600 private static final class HashedWheelBucket {
601
602
603 private HashedWheelTimeout head;
604 private HashedWheelTimeout tail;
605
606
607
608
609 public void addTimeout(HashedWheelTimeout timeout) {
610 assert timeout.bucket == null;
611 timeout.bucket = this;
612 if (head == null) {
613 head = tail = timeout;
614 } else {
615 tail.next = timeout;
616 timeout.prev = tail;
617 tail = timeout;
618 }
619 }
620
621
622
623
624 public void expireTimeouts(long deadline) {
625 HashedWheelTimeout timeout = head;
626
627
628 while (timeout != null) {
629 boolean remove = false;
630 if (timeout.remainingRounds <= 0) {
631 if (timeout.deadline <= deadline) {
632 timeout.expire();
633 } else {
634
635 throw new IllegalStateException(String.format(
636 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
637 }
638 remove = true;
639 } else if (timeout.isCancelled()) {
640 remove = true;
641 } else {
642 timeout.remainingRounds --;
643 }
644
645 HashedWheelTimeout next = timeout.next;
646 if (remove) {
647 remove(timeout);
648 }
649 timeout = next;
650 }
651 }
652
653 public void remove(HashedWheelTimeout timeout) {
654 HashedWheelTimeout next = timeout.next;
655
656 if (timeout.prev != null) {
657 timeout.prev.next = next;
658 }
659 if (timeout.next != null) {
660 timeout.next.prev = timeout.prev;
661 }
662
663 if (timeout == head) {
664
665 if (timeout == tail) {
666 tail = null;
667 head = null;
668 } else {
669 head = next;
670 }
671 } else if (timeout == tail) {
672
673 tail = timeout.prev;
674 }
675
676 timeout.prev = null;
677 timeout.next = null;
678 timeout.bucket = null;
679 }
680
681
682
683
684 public void clearTimeouts(Set<Timeout> set) {
685 for (;;) {
686 HashedWheelTimeout timeout = pollTimeout();
687 if (timeout == null) {
688 return;
689 }
690 if (timeout.isExpired() || timeout.isCancelled()) {
691 continue;
692 }
693 set.add(timeout);
694 }
695 }
696
697 private HashedWheelTimeout pollTimeout() {
698 HashedWheelTimeout head = this.head;
699 if (head == null) {
700 return null;
701 }
702 HashedWheelTimeout next = head.next;
703 if (next == null) {
704 tail = this.head = null;
705 } else {
706 this.head = next;
707 next.prev = null;
708 }
709
710
711 head.next = null;
712 head.prev = null;
713 return head;
714 }
715 }
716 }