1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.util;
17
18 import org.jboss.netty.channel.ChannelPipelineFactory;
19 import org.jboss.netty.logging.InternalLogger;
20 import org.jboss.netty.logging.InternalLoggerFactory;
21 import org.jboss.netty.util.internal.ConcurrentIdentityHashMap;
22 import org.jboss.netty.util.internal.DetectionUtil;
23 import org.jboss.netty.util.internal.ReusableIterator;
24 import org.jboss.netty.util.internal.SharedResourceMisuseDetector;
25
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Set;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.locks.ReadWriteLock;
36 import java.util.concurrent.locks.ReentrantReadWriteLock;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 public class HashedWheelTimer implements Timer {
82
83 static final InternalLogger logger =
84 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
85 private static final AtomicInteger id = new AtomicInteger();
86
87 private static final SharedResourceMisuseDetector misuseDetector =
88 new SharedResourceMisuseDetector(HashedWheelTimer.class);
89
90 private final Worker worker = new Worker();
91 final Thread workerThread;
92
93 public static final int WORKER_STATE_INIT = 0;
94 public static final int WORKER_STATE_STARTED = 1;
95 public static final int WORKER_STATE_SHUTDOWN = 2;
96 final AtomicInteger workerState = new AtomicInteger();
97
98 final long tickDuration;
99 final Set<HashedWheelTimeout>[] wheel;
100 final ReusableIterator<HashedWheelTimeout>[] iterators;
101 final int mask;
102 final ReadWriteLock lock = new ReentrantReadWriteLock();
103 volatile int wheelCursor;
104
105
106
107
108
109
110 public HashedWheelTimer() {
111 this(Executors.defaultThreadFactory());
112 }
113
114
115
116
117
118
119
120
121
122 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
123 this(Executors.defaultThreadFactory(), tickDuration, unit);
124 }
125
126
127
128
129
130
131
132
133
134 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
135 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
136 }
137
138
139
140
141
142
143
144
145
146 public HashedWheelTimer(ThreadFactory threadFactory) {
147 this(threadFactory, 100, TimeUnit.MILLISECONDS);
148 }
149
150
151
152
153
154
155
156
157
158
159 public HashedWheelTimer(
160 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
161 this(threadFactory, tickDuration, unit, 512);
162 }
163
164
165
166
167
168
169
170
171
172
173
174 public HashedWheelTimer(
175 ThreadFactory threadFactory,
176 long tickDuration, TimeUnit unit, int ticksPerWheel) {
177 this(threadFactory, null, tickDuration, unit, ticksPerWheel);
178 }
179
180
181
182
183
184
185
186
187
188
189
190
191 public HashedWheelTimer(
192 ThreadFactory threadFactory,
193 ThreadNameDeterminer determiner,
194 long tickDuration, TimeUnit unit, int ticksPerWheel) {
195
196 if (threadFactory == null) {
197 throw new NullPointerException("threadFactory");
198 }
199 if (unit == null) {
200 throw new NullPointerException("unit");
201 }
202 if (tickDuration <= 0) {
203 throw new IllegalArgumentException(
204 "tickDuration must be greater than 0: " + tickDuration);
205 }
206 if (ticksPerWheel <= 0) {
207 throw new IllegalArgumentException(
208 "ticksPerWheel must be greater than 0: " + ticksPerWheel);
209 }
210
211
212 wheel = createWheel(ticksPerWheel);
213 iterators = createIterators(wheel);
214 mask = wheel.length - 1;
215
216
217 this.tickDuration = unit.toNanos(tickDuration);
218
219
220 if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
221 throw new IllegalArgumentException(String.format(
222 "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
223 tickDuration, Long.MAX_VALUE / wheel.length));
224 }
225
226 workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
227 worker, "Hashed wheel timer #" + id.incrementAndGet(),
228 determiner));
229
230
231 misuseDetector.increase();
232 }
233
234 @SuppressWarnings("unchecked")
235 private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) {
236 if (ticksPerWheel <= 0) {
237 throw new IllegalArgumentException(
238 "ticksPerWheel must be greater than 0: " + ticksPerWheel);
239 }
240 if (ticksPerWheel > 1073741824) {
241 throw new IllegalArgumentException(
242 "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
243 }
244
245 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
246 Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
247 for (int i = 0; i < wheel.length; i ++) {
248 wheel[i] = new MapBackedSet<HashedWheelTimeout>(
249 new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4));
250 }
251 return wheel;
252 }
253
254 @SuppressWarnings("unchecked")
255 private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) {
256 ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length];
257 for (int i = 0; i < wheel.length; i ++) {
258 iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator();
259 }
260 return iterators;
261 }
262
263 private static int normalizeTicksPerWheel(int ticksPerWheel) {
264 int normalizedTicksPerWheel = 1;
265 while (normalizedTicksPerWheel < ticksPerWheel) {
266 normalizedTicksPerWheel <<= 1;
267 }
268 return normalizedTicksPerWheel;
269 }
270
271
272
273
274
275
276
277
278 public void start() {
279 switch (workerState.get()) {
280 case WORKER_STATE_INIT:
281 if (workerState.compareAndSet(WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
282 workerThread.start();
283 }
284 break;
285 case WORKER_STATE_STARTED:
286 break;
287 case WORKER_STATE_SHUTDOWN:
288 throw new IllegalStateException("cannot be started once stopped");
289 default:
290 throw new Error("Invalid WorkerState");
291 }
292 }
293
294 public Set<Timeout> stop() {
295 if (Thread.currentThread() == workerThread) {
296 throw new IllegalStateException(
297 HashedWheelTimer.class.getSimpleName() +
298 ".stop() cannot be called from " +
299 TimerTask.class.getSimpleName());
300 }
301
302 if (!workerState.compareAndSet(WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
303
304 workerState.set(WORKER_STATE_SHUTDOWN);
305 return Collections.emptySet();
306 }
307
308 boolean interrupted = false;
309 while (workerThread.isAlive()) {
310 workerThread.interrupt();
311 try {
312 workerThread.join(100);
313 } catch (InterruptedException e) {
314 interrupted = true;
315 }
316 }
317
318 if (interrupted) {
319 Thread.currentThread().interrupt();
320 }
321
322 misuseDetector.decrease();
323
324 Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
325 for (Set<HashedWheelTimeout> bucket: wheel) {
326 unprocessedTimeouts.addAll(bucket);
327 bucket.clear();
328 }
329
330 return Collections.unmodifiableSet(unprocessedTimeouts);
331 }
332
333 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
334 final long currentTime = System.nanoTime();
335
336 if (task == null) {
337 throw new NullPointerException("task");
338 }
339 if (unit == null) {
340 throw new NullPointerException("unit");
341 }
342
343 start();
344
345 long delayInNanos = unit.toNanos(delay);
346 HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delayInNanos);
347 scheduleTimeout(timeout, delayInNanos);
348 return timeout;
349 }
350
351 void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
352
353
354 long relativeIndex = (delay + tickDuration - 1) / tickDuration;
355
356
357
358 if (relativeIndex < 0) {
359 relativeIndex = delay / tickDuration;
360 }
361 if (relativeIndex == 0) {
362 relativeIndex = 1;
363 }
364 if ((relativeIndex & mask) == 0) {
365 relativeIndex--;
366 }
367 final long remainingRounds = relativeIndex / wheel.length;
368
369
370 lock.readLock().lock();
371 try {
372 if (workerState.get() == WORKER_STATE_SHUTDOWN) {
373 throw new IllegalStateException("Cannot enqueue after shutdown");
374 }
375 final int stopIndex = (int) (wheelCursor + relativeIndex & mask);
376 timeout.stopIndex = stopIndex;
377 timeout.remainingRounds = remainingRounds;
378
379 wheel[stopIndex].add(timeout);
380 } finally {
381 lock.readLock().unlock();
382 }
383 }
384
385 private final class Worker implements Runnable {
386
387 private long startTime;
388 private long tick;
389
390 Worker() {
391 }
392
393 public void run() {
394 List<HashedWheelTimeout> expiredTimeouts =
395 new ArrayList<HashedWheelTimeout>();
396
397 startTime = System.nanoTime();
398 tick = 1;
399
400 while (workerState.get() == WORKER_STATE_STARTED) {
401 final long deadline = waitForNextTick();
402 if (deadline > 0) {
403 fetchExpiredTimeouts(expiredTimeouts, deadline);
404 notifyExpiredTimeouts(expiredTimeouts);
405 }
406 }
407 }
408
409 private void fetchExpiredTimeouts(
410 List<HashedWheelTimeout> expiredTimeouts, long deadline) {
411
412
413
414
415
416 lock.writeLock().lock();
417 try {
418 int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
419 ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor];
420 fetchExpiredTimeouts(expiredTimeouts, i, deadline);
421 } finally {
422 lock.writeLock().unlock();
423 }
424 }
425
426 private void fetchExpiredTimeouts(
427 List<HashedWheelTimeout> expiredTimeouts,
428 ReusableIterator<HashedWheelTimeout> i, long deadline) {
429
430 List<HashedWheelTimeout> slipped = null;
431 i.rewind();
432 while (i.hasNext()) {
433 HashedWheelTimeout timeout = i.next();
434 if (timeout.remainingRounds <= 0) {
435 i.remove();
436 if (timeout.deadline <= deadline) {
437 expiredTimeouts.add(timeout);
438 } else {
439
440
441
442
443 if (slipped == null) {
444 slipped = new ArrayList<HashedWheelTimeout>();
445 }
446 slipped.add(timeout);
447 }
448 } else {
449 timeout.remainingRounds --;
450 }
451 }
452
453
454 if (slipped != null) {
455 for (HashedWheelTimeout timeout: slipped) {
456 scheduleTimeout(timeout, timeout.deadline - deadline);
457 }
458 }
459 }
460
461 private void notifyExpiredTimeouts(
462 List<HashedWheelTimeout> expiredTimeouts) {
463
464 for (int i = expiredTimeouts.size() - 1; i >= 0; i --) {
465 expiredTimeouts.get(i).expire();
466 }
467
468
469 expiredTimeouts.clear();
470 }
471
472
473
474
475
476
477
478 private long waitForNextTick() {
479 long deadline = startTime + tickDuration * tick;
480
481 for (;;) {
482 final long currentTime = System.nanoTime();
483 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
484
485 if (sleepTimeMs <= 0) {
486 tick += 1;
487 if (currentTime == Long.MIN_VALUE) {
488 return -Long.MAX_VALUE;
489 } else {
490 return currentTime;
491 }
492 }
493
494
495
496
497
498
499 if (DetectionUtil.isWindows()) {
500 sleepTimeMs = sleepTimeMs / 10 * 10;
501 }
502 try {
503 Thread.sleep(sleepTimeMs);
504 } catch (InterruptedException e) {
505 if (workerState.get() == WORKER_STATE_SHUTDOWN) {
506 return Long.MIN_VALUE;
507 }
508 }
509 }
510 }
511 }
512
513 private final class HashedWheelTimeout implements Timeout {
514
515 private static final int ST_INIT = 0;
516 private static final int ST_CANCELLED = 1;
517 private static final int ST_EXPIRED = 2;
518
519 private final TimerTask task;
520 final long deadline;
521 volatile int stopIndex;
522 volatile long remainingRounds;
523 private final AtomicInteger state = new AtomicInteger(ST_INIT);
524
525 HashedWheelTimeout(TimerTask task, long deadline) {
526 this.task = task;
527 this.deadline = deadline;
528 }
529
530 public Timer getTimer() {
531 return HashedWheelTimer.this;
532 }
533
534 public TimerTask getTask() {
535 return task;
536 }
537
538 public void cancel() {
539 if (!state.compareAndSet(ST_INIT, ST_CANCELLED)) {
540
541 return;
542 }
543
544 wheel[stopIndex].remove(this);
545 }
546
547 public boolean isCancelled() {
548 return state.get() == ST_CANCELLED;
549 }
550
551 public boolean isExpired() {
552 return state.get() != ST_INIT;
553 }
554
555 public void expire() {
556 if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) {
557 return;
558 }
559
560 try {
561 task.run(this);
562 } catch (Throwable t) {
563 if (logger.isWarnEnabled()) {
564 logger.warn(
565 "An exception was thrown by " +
566 TimerTask.class.getSimpleName() + '.', t);
567 }
568 }
569 }
570
571 @Override
572 public String toString() {
573 long currentTime = System.nanoTime();
574 long remaining = deadline - currentTime;
575
576 StringBuilder buf = new StringBuilder(192);
577 buf.append(getClass().getSimpleName());
578 buf.append('(');
579
580 buf.append("deadline: ");
581 if (remaining > 0) {
582 buf.append(remaining);
583 buf.append(" ms later, ");
584 } else if (remaining < 0) {
585 buf.append(-remaining);
586 buf.append(" ms ago, ");
587 } else {
588 buf.append("now, ");
589 }
590
591 if (isCancelled()) {
592 buf.append(", cancelled");
593 }
594
595 return buf.append(')').toString();
596 }
597 }
598 }