1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.util;
17
18 import io.netty.util.internal.PlatformDependent;
19 import io.netty.util.internal.logging.InternalLogger;
20 import io.netty.util.internal.logging.InternalLoggerFactory;
21
22 import java.util.Collections;
23 import java.util.HashSet;
24 import java.util.Queue;
25 import java.util.Set;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
34 import java.util.concurrent.atomic.AtomicLong;
35
36 import static io.netty.util.internal.StringUtil.simpleClassName;
37
38 /**
39 * A {@link Timer} optimized for approximated I/O timeout scheduling.
40 *
41 * <h3>Tick Duration</h3>
42 *
43 * As described with 'approximated', this timer does not execute the scheduled
44 * {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will
45 * check if there are any {@link TimerTask}s behind the schedule and execute
46 * them.
47 * <p>
48 * You can increase or decrease the accuracy of the execution timing by
49 * specifying smaller or larger tick duration in the constructor. In most
50 * network applications, I/O timeout does not need to be accurate. Therefore,
51 * the default tick duration is 100 milliseconds and you will not need to try
52 * different configurations in most cases.
53 *
54 * <h3>Ticks per Wheel (Wheel Size)</h3>
55 *
56 * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
57 * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
58 * function is 'dead line of the task'. The default number of ticks per wheel
59 * (i.e. the size of the wheel) is 512. You could specify a larger value
60 * if you are going to schedule a lot of timeouts.
61 *
62 * <h3>Do not create many instances.</h3>
63 *
64 * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
65 * started. Therefore, you should make sure to create only one instance and
66 * share it across your application. One of the common mistakes, that makes
67 * your application unresponsive, is to create a new instance for every connection.
68 *
69 * <h3>Implementation Details</h3>
70 *
71 * {@link HashedWheelTimer} is based on
72 * <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
73 * Tony Lauck's paper,
74 * <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
75 * and Hierarchical Timing Wheels: data structures to efficiently implement a
76 * timer facility'</a>. More comprehensive slides are located
77 * <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
78 */
79 public class HashedWheelTimer implements Timer {
80
81 static final InternalLogger logger =
82 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
83
84 private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
85 private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
86 private static final int INSTANCE_COUNT_LIMIT = 64;
87 private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
88 .newResourceLeakDetector(HashedWheelTimer.class, 1);
89
90 private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
91 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
92
93 private final ResourceLeakTracker<HashedWheelTimer> leak;
94 private final Worker worker = new Worker();
95 private final Thread workerThread;
96
97 public static final int WORKER_STATE_INIT = 0;
98 public static final int WORKER_STATE_STARTED = 1;
99 public static final int WORKER_STATE_SHUTDOWN = 2;
100 @SuppressWarnings({ "unused", "FieldMayBeFinal" })
101 private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
102
103 private final long tickDuration;
104 private final HashedWheelBucket[] wheel;
105 private final int mask;
106 private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
107 private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
108 private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
109 private final AtomicLong pendingTimeouts = new AtomicLong(0);
110 private final long maxPendingTimeouts;
111
112 private volatile long startTime;
113
114 /**
115 * Creates a new timer with the default thread factory
116 * ({@link Executors#defaultThreadFactory()}), default tick duration, and
117 * default number of ticks per wheel.
118 */
119 public HashedWheelTimer() {
120 this(Executors.defaultThreadFactory());
121 }
122
123 /**
124 * Creates a new timer with the default thread factory
125 * ({@link Executors#defaultThreadFactory()}) and default number of ticks
126 * per wheel.
127 *
128 * @param tickDuration the duration between tick
129 * @param unit the time unit of the {@code tickDuration}
130 * @throws NullPointerException if {@code unit} is {@code null}
131 * @throws IllegalArgumentException if {@code tickDuration} is <= 0
132 */
133 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
134 this(Executors.defaultThreadFactory(), tickDuration, unit);
135 }
136
137 /**
138 * Creates a new timer with the default thread factory
139 * ({@link Executors#defaultThreadFactory()}).
140 *
141 * @param tickDuration the duration between tick
142 * @param unit the time unit of the {@code tickDuration}
143 * @param ticksPerWheel the size of the wheel
144 * @throws NullPointerException if {@code unit} is {@code null}
145 * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
146 */
147 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
148 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
149 }
150
151 /**
152 * Creates a new timer with the default tick duration and default number of
153 * ticks per wheel.
154 *
155 * @param threadFactory a {@link ThreadFactory} that creates a
156 * background {@link Thread} which is dedicated to
157 * {@link TimerTask} execution.
158 * @throws NullPointerException if {@code threadFactory} is {@code null}
159 */
160 public HashedWheelTimer(ThreadFactory threadFactory) {
161 this(threadFactory, 100, TimeUnit.MILLISECONDS);
162 }
163
164 /**
165 * Creates a new timer with the default number of ticks per wheel.
166 *
167 * @param threadFactory a {@link ThreadFactory} that creates a
168 * background {@link Thread} which is dedicated to
169 * {@link TimerTask} execution.
170 * @param tickDuration the duration between tick
171 * @param unit the time unit of the {@code tickDuration}
172 * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
173 * @throws IllegalArgumentException if {@code tickDuration} is <= 0
174 */
175 public HashedWheelTimer(
176 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
177 this(threadFactory, tickDuration, unit, 512);
178 }
179
180 /**
181 * Creates a new timer.
182 *
183 * @param threadFactory a {@link ThreadFactory} that creates a
184 * background {@link Thread} which is dedicated to
185 * {@link TimerTask} execution.
186 * @param tickDuration the duration between tick
187 * @param unit the time unit of the {@code tickDuration}
188 * @param ticksPerWheel the size of the wheel
189 * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
190 * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
191 */
192 public HashedWheelTimer(
193 ThreadFactory threadFactory,
194 long tickDuration, TimeUnit unit, int ticksPerWheel) {
195 this(threadFactory, tickDuration, unit, ticksPerWheel, true);
196 }
197
198 /**
199 * Creates a new timer.
200 *
201 * @param threadFactory a {@link ThreadFactory} that creates a
202 * background {@link Thread} which is dedicated to
203 * {@link TimerTask} execution.
204 * @param tickDuration the duration between tick
205 * @param unit the time unit of the {@code tickDuration}
206 * @param ticksPerWheel the size of the wheel
207 * @param leakDetection {@code true} if leak detection should be enabled always,
208 * if false it will only be enabled if the worker thread is not
209 * a daemon thread.
210 * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
211 * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
212 */
213 public HashedWheelTimer(
214 ThreadFactory threadFactory,
215 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
216 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
217 }
218
219 /**
220 * Creates a new timer.
221 *
222 * @param threadFactory a {@link ThreadFactory} that creates a
223 * background {@link Thread} which is dedicated to
224 * {@link TimerTask} execution.
225 * @param tickDuration the duration between tick
226 * @param unit the time unit of the {@code tickDuration}
227 * @param ticksPerWheel the size of the wheel
228 * @param leakDetection {@code true} if leak detection should be enabled always,
229 * if false it will only be enabled if the worker thread is not
230 * a daemon thread.
231 * @param maxPendingTimeouts The maximum number of pending timeouts after which call to
232 * {@code newTimeout} will result in
233 * {@link java.util.concurrent.RejectedExecutionException}
234 * being thrown. No maximum pending timeouts limit is assumed if
235 * this value is 0 or negative.
236 * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
237 * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
238 */
239 public HashedWheelTimer(
240 ThreadFactory threadFactory,
241 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
242 long maxPendingTimeouts) {
243
244 if (threadFactory == null) {
245 throw new NullPointerException("threadFactory");
246 }
247 if (unit == null) {
248 throw new NullPointerException("unit");
249 }
250 if (tickDuration <= 0) {
251 throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
252 }
253 if (ticksPerWheel <= 0) {
254 throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
255 }
256
257 // Normalize ticksPerWheel to power of two and initialize the wheel.
258 wheel = createWheel(ticksPerWheel);
259 mask = wheel.length - 1;
260
261 // Convert tickDuration to nanos.
262 this.tickDuration = unit.toNanos(tickDuration);
263
264 // Prevent overflow.
265 if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
266 throw new IllegalArgumentException(String.format(
267 "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
268 tickDuration, Long.MAX_VALUE / wheel.length));
269 }
270 workerThread = threadFactory.newThread(worker);
271
272 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
273
274 this.maxPendingTimeouts = maxPendingTimeouts;
275
276 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
277 WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
278 reportTooManyInstances();
279 }
280 }
281
282 @Override
283 protected void finalize() throws Throwable {
284 try {
285 super.finalize();
286 } finally {
287 // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
288 // we have not yet shutdown then we want to make sure we decrement the active instance count.
289 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
290 INSTANCE_COUNTER.decrementAndGet();
291 }
292 }
293 }
294
295 private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
296 if (ticksPerWheel <= 0) {
297 throw new IllegalArgumentException(
298 "ticksPerWheel must be greater than 0: " + ticksPerWheel);
299 }
300 if (ticksPerWheel > 1073741824) {
301 throw new IllegalArgumentException(
302 "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
303 }
304
305 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
306 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
307 for (int i = 0; i < wheel.length; i ++) {
308 wheel[i] = new HashedWheelBucket();
309 }
310 return wheel;
311 }
312
313 private static int normalizeTicksPerWheel(int ticksPerWheel) {
314 int normalizedTicksPerWheel = 1;
315 while (normalizedTicksPerWheel < ticksPerWheel) {
316 normalizedTicksPerWheel <<= 1;
317 }
318 return normalizedTicksPerWheel;
319 }
320
321 /**
322 * Starts the background thread explicitly. The background thread will
323 * start automatically on demand even if you did not call this method.
324 *
325 * @throws IllegalStateException if this timer has been
326 * {@linkplain #stop() stopped} already
327 */
328 public void start() {
329 switch (WORKER_STATE_UPDATER.get(this)) {
330 case WORKER_STATE_INIT:
331 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
332 workerThread.start();
333 }
334 break;
335 case WORKER_STATE_STARTED:
336 break;
337 case WORKER_STATE_SHUTDOWN:
338 throw new IllegalStateException("cannot be started once stopped");
339 default:
340 throw new Error("Invalid WorkerState");
341 }
342
343 // Wait until the startTime is initialized by the worker.
344 while (startTime == 0) {
345 try {
346 startTimeInitialized.await();
347 } catch (InterruptedException ignore) {
348 // Ignore - it will be ready very soon.
349 }
350 }
351 }
352
353 @Override
354 public Set<Timeout> stop() {
355 if (Thread.currentThread() == workerThread) {
356 throw new IllegalStateException(
357 HashedWheelTimer.class.getSimpleName() +
358 ".stop() cannot be called from " +
359 TimerTask.class.getSimpleName());
360 }
361
362 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
363 // workerState can be 0 or 2 at this moment - let it always be 2.
364 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
365 INSTANCE_COUNTER.decrementAndGet();
366 if (leak != null) {
367 boolean closed = leak.close(this);
368 assert closed;
369 }
370 }
371
372 return Collections.emptySet();
373 }
374
375 try {
376 boolean interrupted = false;
377 while (workerThread.isAlive()) {
378 workerThread.interrupt();
379 try {
380 workerThread.join(100);
381 } catch (InterruptedException ignored) {
382 interrupted = true;
383 }
384 }
385
386 if (interrupted) {
387 Thread.currentThread().interrupt();
388 }
389 } finally {
390 INSTANCE_COUNTER.decrementAndGet();
391 if (leak != null) {
392 boolean closed = leak.close(this);
393 assert closed;
394 }
395 }
396 return worker.unprocessedTimeouts();
397 }
398
399 @Override
400 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
401 if (task == null) {
402 throw new NullPointerException("task");
403 }
404 if (unit == null) {
405 throw new NullPointerException("unit");
406 }
407
408 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
409
410 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
411 pendingTimeouts.decrementAndGet();
412 throw new RejectedExecutionException("Number of pending timeouts ("
413 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
414 + "timeouts (" + maxPendingTimeouts + ")");
415 }
416
417 start();
418
419 // Add the timeout to the timeout queue which will be processed on the next tick.
420 // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
421 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
422 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
423 timeouts.add(timeout);
424 return timeout;
425 }
426
427 /**
428 * Returns the number of pending timeouts of this {@link Timer}.
429 */
430 public long pendingTimeouts() {
431 return pendingTimeouts.get();
432 }
433
434 private static void reportTooManyInstances() {
435 String resourceType = simpleClassName(HashedWheelTimer.class);
436 logger.error("You are creating too many " + resourceType + " instances. " +
437 resourceType + " is a shared resource that must be reused across the JVM," +
438 "so that only a few instances are created.");
439 }
440
441 private final class Worker implements Runnable {
442 private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
443
444 private long tick;
445
446 @Override
447 public void run() {
448 // Initialize the startTime.
449 startTime = System.nanoTime();
450 if (startTime == 0) {
451 // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
452 startTime = 1;
453 }
454
455 // Notify the other threads waiting for the initialization at start().
456 startTimeInitialized.countDown();
457
458 do {
459 final long deadline = waitForNextTick();
460 if (deadline > 0) {
461 int idx = (int) (tick & mask);
462 processCancelledTasks();
463 HashedWheelBucket bucket =
464 wheel[idx];
465 transferTimeoutsToBuckets();
466 bucket.expireTimeouts(deadline);
467 tick++;
468 }
469 } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
470
471 // Fill the unprocessedTimeouts so we can return them from stop() method.
472 for (HashedWheelBucket bucket: wheel) {
473 bucket.clearTimeouts(unprocessedTimeouts);
474 }
475 for (;;) {
476 HashedWheelTimeout timeout = timeouts.poll();
477 if (timeout == null) {
478 break;
479 }
480 if (!timeout.isCancelled()) {
481 unprocessedTimeouts.add(timeout);
482 }
483 }
484 processCancelledTasks();
485 }
486
487 private void transferTimeoutsToBuckets() {
488 // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
489 // adds new timeouts in a loop.
490 for (int i = 0; i < 100000; i++) {
491 HashedWheelTimeout timeout = timeouts.poll();
492 if (timeout == null) {
493 // all processed
494 break;
495 }
496 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
497 // Was cancelled in the meantime.
498 continue;
499 }
500
501 long calculated = timeout.deadline / tickDuration;
502 timeout.remainingRounds = (calculated - tick) / wheel.length;
503
504 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
505 int stopIndex = (int) (ticks & mask);
506
507 HashedWheelBucket bucket = wheel[stopIndex];
508 bucket.addTimeout(timeout);
509 }
510 }
511
512 private void processCancelledTasks() {
513 for (;;) {
514 HashedWheelTimeout timeout = cancelledTimeouts.poll();
515 if (timeout == null) {
516 // all processed
517 break;
518 }
519 try {
520 timeout.remove();
521 } catch (Throwable t) {
522 if (logger.isWarnEnabled()) {
523 logger.warn("An exception was thrown while process a cancellation task", t);
524 }
525 }
526 }
527 }
528
529 /**
530 * calculate goal nanoTime from startTime and current tick number,
531 * then wait until that goal has been reached.
532 * @return Long.MIN_VALUE if received a shutdown request,
533 * current time otherwise (with Long.MIN_VALUE changed by +1)
534 */
535 private long waitForNextTick() {
536 long deadline = tickDuration * (tick + 1);
537
538 for (;;) {
539 final long currentTime = System.nanoTime() - startTime;
540 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
541
542 if (sleepTimeMs <= 0) {
543 if (currentTime == Long.MIN_VALUE) {
544 return -Long.MAX_VALUE;
545 } else {
546 return currentTime;
547 }
548 }
549
550 // Check if we run on windows, as if thats the case we will need
551 // to round the sleepTime as workaround for a bug that only affect
552 // the JVM if it runs on windows.
553 //
554 // See https://github.com/netty/netty/issues/356
555 if (PlatformDependent.isWindows()) {
556 sleepTimeMs = sleepTimeMs / 10 * 10;
557 }
558
559 try {
560 Thread.sleep(sleepTimeMs);
561 } catch (InterruptedException ignored) {
562 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
563 return Long.MIN_VALUE;
564 }
565 }
566 }
567 }
568
569 public Set<Timeout> unprocessedTimeouts() {
570 return Collections.unmodifiableSet(unprocessedTimeouts);
571 }
572 }
573
574 private static final class HashedWheelTimeout implements Timeout {
575
576 private static final int ST_INIT = 0;
577 private static final int ST_CANCELLED = 1;
578 private static final int ST_EXPIRED = 2;
579 private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
580 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
581
582 private final HashedWheelTimer timer;
583 private final TimerTask task;
584 private final long deadline;
585
586 @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
587 private volatile int state = ST_INIT;
588
589 // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
590 // HashedWheelTimeout will be added to the correct HashedWheelBucket.
591 long remainingRounds;
592
593 // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
594 // As only the workerThread will act on it there is no need for synchronization / volatile.
595 HashedWheelTimeout next;
596 HashedWheelTimeout prev;
597
598 // The bucket to which the timeout was added
599 HashedWheelBucket bucket;
600
601 HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
602 this.timer = timer;
603 this.task = task;
604 this.deadline = deadline;
605 }
606
607 @Override
608 public Timer timer() {
609 return timer;
610 }
611
612 @Override
613 public TimerTask task() {
614 return task;
615 }
616
617 @Override
618 public boolean cancel() {
619 // only update the state it will be removed from HashedWheelBucket on next tick.
620 if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
621 return false;
622 }
623 // If a task should be canceled we put this to another queue which will be processed on each tick.
624 // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
625 // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
626 timer.cancelledTimeouts.add(this);
627 return true;
628 }
629
630 void remove() {
631 HashedWheelBucket bucket = this.bucket;
632 if (bucket != null) {
633 bucket.remove(this);
634 } else {
635 timer.pendingTimeouts.decrementAndGet();
636 }
637 }
638
639 public boolean compareAndSetState(int expected, int state) {
640 return STATE_UPDATER.compareAndSet(this, expected, state);
641 }
642
643 public int state() {
644 return state;
645 }
646
647 @Override
648 public boolean isCancelled() {
649 return state() == ST_CANCELLED;
650 }
651
652 @Override
653 public boolean isExpired() {
654 return state() == ST_EXPIRED;
655 }
656
657 public void expire() {
658 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
659 return;
660 }
661
662 try {
663 task.run(this);
664 } catch (Throwable t) {
665 if (logger.isWarnEnabled()) {
666 logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
667 }
668 }
669 }
670
671 @Override
672 public String toString() {
673 final long currentTime = System.nanoTime();
674 long remaining = deadline - currentTime + timer.startTime;
675
676 StringBuilder buf = new StringBuilder(192)
677 .append(simpleClassName(this))
678 .append('(')
679 .append("deadline: ");
680 if (remaining > 0) {
681 buf.append(remaining)
682 .append(" ns later");
683 } else if (remaining < 0) {
684 buf.append(-remaining)
685 .append(" ns ago");
686 } else {
687 buf.append("now");
688 }
689
690 if (isCancelled()) {
691 buf.append(", cancelled");
692 }
693
694 return buf.append(", task: ")
695 .append(task())
696 .append(')')
697 .toString();
698 }
699 }
700
701 /**
702 * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
703 * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
704 * extra object creation is needed.
705 */
706 private static final class HashedWheelBucket {
707 // Used for the linked-list datastructure
708 private HashedWheelTimeout head;
709 private HashedWheelTimeout tail;
710
711 /**
712 * Add {@link HashedWheelTimeout} to this bucket.
713 */
714 public void addTimeout(HashedWheelTimeout timeout) {
715 assert timeout.bucket == null;
716 timeout.bucket = this;
717 if (head == null) {
718 head = tail = timeout;
719 } else {
720 tail.next = timeout;
721 timeout.prev = tail;
722 tail = timeout;
723 }
724 }
725
726 /**
727 * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
728 */
729 public void expireTimeouts(long deadline) {
730 HashedWheelTimeout timeout = head;
731
732 // process all timeouts
733 while (timeout != null) {
734 HashedWheelTimeout next = timeout.next;
735 if (timeout.remainingRounds <= 0) {
736 next = remove(timeout);
737 if (timeout.deadline <= deadline) {
738 timeout.expire();
739 } else {
740 // The timeout was placed into a wrong slot. This should never happen.
741 throw new IllegalStateException(String.format(
742 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
743 }
744 } else if (timeout.isCancelled()) {
745 next = remove(timeout);
746 } else {
747 timeout.remainingRounds --;
748 }
749 timeout = next;
750 }
751 }
752
753 public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
754 HashedWheelTimeout next = timeout.next;
755 // remove timeout that was either processed or cancelled by updating the linked-list
756 if (timeout.prev != null) {
757 timeout.prev.next = next;
758 }
759 if (timeout.next != null) {
760 timeout.next.prev = timeout.prev;
761 }
762
763 if (timeout == head) {
764 // if timeout is also the tail we need to adjust the entry too
765 if (timeout == tail) {
766 tail = null;
767 head = null;
768 } else {
769 head = next;
770 }
771 } else if (timeout == tail) {
772 // if the timeout is the tail modify the tail to be the prev node.
773 tail = timeout.prev;
774 }
775 // null out prev, next and bucket to allow for GC.
776 timeout.prev = null;
777 timeout.next = null;
778 timeout.bucket = null;
779 timeout.timer.pendingTimeouts.decrementAndGet();
780 return next;
781 }
782
783 /**
784 * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
785 */
786 public void clearTimeouts(Set<Timeout> set) {
787 for (;;) {
788 HashedWheelTimeout timeout = pollTimeout();
789 if (timeout == null) {
790 return;
791 }
792 if (timeout.isExpired() || timeout.isCancelled()) {
793 continue;
794 }
795 set.add(timeout);
796 }
797 }
798
799 private HashedWheelTimeout pollTimeout() {
800 HashedWheelTimeout head = this.head;
801 if (head == null) {
802 return null;
803 }
804 HashedWheelTimeout next = head.next;
805 if (next == null) {
806 tail = this.head = null;
807 } else {
808 this.head = next;
809 next.prev = null;
810 }
811
812 // null out prev and next to allow for GC.
813 head.next = null;
814 head.prev = null;
815 head.bucket = null;
816 return head;
817 }
818 }
819 }