1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.util;
17
18 import org.jboss.netty.channel.ChannelPipelineFactory;
19 import org.jboss.netty.logging.InternalLogger;
20 import org.jboss.netty.logging.InternalLoggerFactory;
21 import org.jboss.netty.util.internal.ConcurrentIdentityHashMap;
22 import org.jboss.netty.util.internal.DetectionUtil;
23 import org.jboss.netty.util.internal.ReusableIterator;
24 import org.jboss.netty.util.internal.SharedResourceMisuseDetector;
25
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Set;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.locks.ReadWriteLock;
36 import java.util.concurrent.locks.ReentrantReadWriteLock;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 public class HashedWheelTimer implements Timer {
82
83 static final InternalLogger logger =
84 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
85 private static final AtomicInteger id = new AtomicInteger();
86
87 private static final SharedResourceMisuseDetector misuseDetector =
88 new SharedResourceMisuseDetector(HashedWheelTimer.class);
89
90 private final Worker worker = new Worker();
91 final Thread workerThread;
92 final AtomicInteger workerState = new AtomicInteger();
93
94 private final long roundDuration;
95 final long tickDuration;
96 final Set<HashedWheelTimeout>[] wheel;
97 final ReusableIterator<HashedWheelTimeout>[] iterators;
98 final int mask;
99 final ReadWriteLock lock = new ReentrantReadWriteLock();
100 volatile int wheelCursor;
101
102
103
104
105
106
107 public HashedWheelTimer() {
108 this(Executors.defaultThreadFactory());
109 }
110
111
112
113
114
115
116
117
118
119 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
120 this(Executors.defaultThreadFactory(), tickDuration, unit);
121 }
122
123
124
125
126
127
128
129
130
131 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
132 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
133 }
134
135
136
137
138
139
140
141
142
143 public HashedWheelTimer(ThreadFactory threadFactory) {
144 this(threadFactory, 100, TimeUnit.MILLISECONDS);
145 }
146
147
148
149
150
151
152
153
154
155
156 public HashedWheelTimer(
157 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
158 this(threadFactory, tickDuration, unit, 512);
159 }
160
161
162
163
164
165
166
167
168
169
170
171 public HashedWheelTimer(
172 ThreadFactory threadFactory,
173 long tickDuration, TimeUnit unit, int ticksPerWheel) {
174
175 if (threadFactory == null) {
176 throw new NullPointerException("threadFactory");
177 }
178 if (unit == null) {
179 throw new NullPointerException("unit");
180 }
181 if (tickDuration <= 0) {
182 throw new IllegalArgumentException(
183 "tickDuration must be greater than 0: " + tickDuration);
184 }
185 if (ticksPerWheel <= 0) {
186 throw new IllegalArgumentException(
187 "ticksPerWheel must be greater than 0: " + ticksPerWheel);
188 }
189
190
191 wheel = createWheel(ticksPerWheel);
192 iterators = createIterators(wheel);
193 mask = wheel.length - 1;
194
195
196 this.tickDuration = tickDuration = unit.toMillis(tickDuration);
197
198
199 if (tickDuration == Long.MAX_VALUE ||
200 tickDuration >= Long.MAX_VALUE / wheel.length) {
201 throw new IllegalArgumentException(
202 "tickDuration is too long: " +
203 tickDuration + ' ' + unit);
204 }
205
206 roundDuration = tickDuration * wheel.length;
207
208 workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
209 worker, "Hashed wheel timer #" + id.incrementAndGet()));
210
211
212 misuseDetector.increase();
213 }
214
215 @SuppressWarnings("unchecked")
216 private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) {
217 if (ticksPerWheel <= 0) {
218 throw new IllegalArgumentException(
219 "ticksPerWheel must be greater than 0: " + ticksPerWheel);
220 }
221 if (ticksPerWheel > 1073741824) {
222 throw new IllegalArgumentException(
223 "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
224 }
225
226 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
227 Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
228 for (int i = 0; i < wheel.length; i ++) {
229 wheel[i] = new MapBackedSet<HashedWheelTimeout>(
230 new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4));
231 }
232 return wheel;
233 }
234
235 @SuppressWarnings("unchecked")
236 private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) {
237 ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length];
238 for (int i = 0; i < wheel.length; i ++) {
239 iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator();
240 }
241 return iterators;
242 }
243
244 private static int normalizeTicksPerWheel(int ticksPerWheel) {
245 int normalizedTicksPerWheel = 1;
246 while (normalizedTicksPerWheel < ticksPerWheel) {
247 normalizedTicksPerWheel <<= 1;
248 }
249 return normalizedTicksPerWheel;
250 }
251
252
253
254
255
256
257
258
259 public void start() {
260 switch (workerState.get()) {
261 case 0:
262 if (workerState.compareAndSet(0, 1)) {
263 workerThread.start();
264 }
265 break;
266 case 1:
267 break;
268 case 2:
269 throw new IllegalStateException("cannot be started once stopped");
270 default:
271 throw new Error();
272 }
273 }
274
275 public Set<Timeout> stop() {
276 if (Thread.currentThread() == workerThread) {
277 throw new IllegalStateException(
278 HashedWheelTimer.class.getSimpleName() +
279 ".stop() cannot be called from " +
280 TimerTask.class.getSimpleName());
281 }
282
283 if (workerState.getAndSet(2) != 1) {
284
285 return Collections.emptySet();
286 }
287
288 boolean interrupted = false;
289 while (workerThread.isAlive()) {
290 workerThread.interrupt();
291 try {
292 workerThread.join(100);
293 } catch (InterruptedException e) {
294 interrupted = true;
295 }
296 }
297
298 if (interrupted) {
299 Thread.currentThread().interrupt();
300 }
301
302 misuseDetector.decrease();
303
304 Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
305 for (Set<HashedWheelTimeout> bucket: wheel) {
306 unprocessedTimeouts.addAll(bucket);
307 bucket.clear();
308 }
309
310 return Collections.unmodifiableSet(unprocessedTimeouts);
311 }
312
313 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
314 final long currentTime = System.currentTimeMillis();
315
316 if (task == null) {
317 throw new NullPointerException("task");
318 }
319 if (unit == null) {
320 throw new NullPointerException("unit");
321 }
322
323 start();
324
325 delay = unit.toMillis(delay);
326 HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay);
327 scheduleTimeout(timeout, delay);
328 return timeout;
329 }
330
331 void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
332
333
334 if (delay < tickDuration) {
335 delay = tickDuration;
336 }
337
338
339 final long lastRoundDelay = delay % roundDuration;
340 final long lastTickDelay = delay % tickDuration;
341 final long relativeIndex =
342 lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0);
343
344 final long remainingRounds =
345 delay / roundDuration - (delay % roundDuration == 0? 1 : 0);
346
347
348 lock.readLock().lock();
349 try {
350 int stopIndex = (int) (wheelCursor + relativeIndex & mask);
351 timeout.stopIndex = stopIndex;
352 timeout.remainingRounds = remainingRounds;
353
354 wheel[stopIndex].add(timeout);
355 } finally {
356 lock.readLock().unlock();
357 }
358 }
359
360 private final class Worker implements Runnable {
361
362 private long startTime;
363 private long tick;
364
365 Worker() {
366 }
367
368 public void run() {
369 List<HashedWheelTimeout> expiredTimeouts =
370 new ArrayList<HashedWheelTimeout>();
371
372 startTime = System.currentTimeMillis();
373 tick = 1;
374
375 while (workerState.get() == 1) {
376 final long deadline = waitForNextTick();
377 if (deadline > 0) {
378 fetchExpiredTimeouts(expiredTimeouts, deadline);
379 notifyExpiredTimeouts(expiredTimeouts);
380 }
381 }
382 }
383
384 private void fetchExpiredTimeouts(
385 List<HashedWheelTimeout> expiredTimeouts, long deadline) {
386
387
388
389
390
391 lock.writeLock().lock();
392 try {
393 int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
394 ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor];
395 fetchExpiredTimeouts(expiredTimeouts, i, deadline);
396 } finally {
397 lock.writeLock().unlock();
398 }
399 }
400
401 private void fetchExpiredTimeouts(
402 List<HashedWheelTimeout> expiredTimeouts,
403 ReusableIterator<HashedWheelTimeout> i, long deadline) {
404
405 List<HashedWheelTimeout> slipped = null;
406 i.rewind();
407 while (i.hasNext()) {
408 HashedWheelTimeout timeout = i.next();
409 if (timeout.remainingRounds <= 0) {
410 i.remove();
411 if (timeout.deadline <= deadline) {
412 expiredTimeouts.add(timeout);
413 } else {
414
415
416
417
418 if (slipped == null) {
419 slipped = new ArrayList<HashedWheelTimer.HashedWheelTimeout>();
420 }
421 slipped.add(timeout);
422 }
423 } else {
424 timeout.remainingRounds --;
425 }
426 }
427
428
429 if (slipped != null) {
430 for (HashedWheelTimeout timeout: slipped) {
431 scheduleTimeout(timeout, timeout.deadline - deadline);
432 }
433 }
434 }
435
436 private void notifyExpiredTimeouts(
437 List<HashedWheelTimeout> expiredTimeouts) {
438
439 for (int i = expiredTimeouts.size() - 1; i >= 0; i --) {
440 expiredTimeouts.get(i).expire();
441 }
442
443
444 expiredTimeouts.clear();
445 }
446
447 private long waitForNextTick() {
448 long deadline = startTime + tickDuration * tick;
449
450 for (;;) {
451 final long currentTime = System.currentTimeMillis();
452 long sleepTime = tickDuration * tick - (currentTime - startTime);
453
454
455
456
457
458
459 if (DetectionUtil.isWindows()) {
460 sleepTime = sleepTime / 10 * 10;
461 }
462
463 if (sleepTime <= 0) {
464 break;
465 }
466 try {
467 Thread.sleep(sleepTime);
468 } catch (InterruptedException e) {
469 if (workerState.get() != 1) {
470 return -1;
471 }
472 }
473 }
474
475
476 tick ++;
477 return deadline;
478 }
479 }
480
481 private final class HashedWheelTimeout implements Timeout {
482
483 private static final int ST_INIT = 0;
484 private static final int ST_CANCELLED = 1;
485 private static final int ST_EXPIRED = 2;
486
487 private final TimerTask task;
488 final long deadline;
489 volatile int stopIndex;
490 volatile long remainingRounds;
491 private final AtomicInteger state = new AtomicInteger(ST_INIT);
492
493 HashedWheelTimeout(TimerTask task, long deadline) {
494 this.task = task;
495 this.deadline = deadline;
496 }
497
498 public Timer getTimer() {
499 return HashedWheelTimer.this;
500 }
501
502 public TimerTask getTask() {
503 return task;
504 }
505
506 public void cancel() {
507 if (!state.compareAndSet(ST_INIT, ST_CANCELLED)) {
508
509 return;
510 }
511
512 wheel[stopIndex].remove(this);
513 }
514
515 public boolean isCancelled() {
516 return state.get() == ST_CANCELLED;
517 }
518
519 public boolean isExpired() {
520 return state.get() != ST_INIT;
521 }
522
523 public void expire() {
524 if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) {
525 return;
526 }
527
528 try {
529 task.run(this);
530 } catch (Throwable t) {
531 if (logger.isWarnEnabled()) {
532 logger.warn(
533 "An exception was thrown by " +
534 TimerTask.class.getSimpleName() + '.', t);
535 }
536
537 }
538 }
539
540 @Override
541 public String toString() {
542 long currentTime = System.currentTimeMillis();
543 long remaining = deadline - currentTime;
544
545 StringBuilder buf = new StringBuilder(192);
546 buf.append(getClass().getSimpleName());
547 buf.append('(');
548
549 buf.append("deadline: ");
550 if (remaining > 0) {
551 buf.append(remaining);
552 buf.append(" ms later, ");
553 } else if (remaining < 0) {
554 buf.append(-remaining);
555 buf.append(" ms ago, ");
556 } else {
557 buf.append("now, ");
558 }
559
560 if (isCancelled()) {
561 buf.append(", cancelled");
562 }
563
564 return buf.append(')').toString();
565 }
566 }
567 }