View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util;
17  
18  import io.netty.util.internal.PlatformDependent;
19  import io.netty.util.internal.logging.InternalLogger;
20  import io.netty.util.internal.logging.InternalLoggerFactory;
21  
22  import java.util.Collections;
23  import java.util.HashSet;
24  import java.util.Queue;
25  import java.util.Set;
26  import java.util.concurrent.CountDownLatch;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.RejectedExecutionException;
29  import java.util.concurrent.ThreadFactory;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
34  import java.util.concurrent.atomic.AtomicLong;
35  
36  import static io.netty.util.internal.StringUtil.simpleClassName;
37  
38  /**
39   * A {@link Timer} optimized for approximated I/O timeout scheduling.
40   *
41   * <h3>Tick Duration</h3>
42   *
43   * As described with 'approximated', this timer does not execute the scheduled
44   * {@link TimerTask} on time.  {@link HashedWheelTimer}, on every tick, will
45   * check if there are any {@link TimerTask}s behind the schedule and execute
46   * them.
47   * <p>
48   * You can increase or decrease the accuracy of the execution timing by
49   * specifying smaller or larger tick duration in the constructor.  In most
50   * network applications, I/O timeout does not need to be accurate.  Therefore,
51   * the default tick duration is 100 milliseconds and you will not need to try
52   * different configurations in most cases.
53   *
54   * <h3>Ticks per Wheel (Wheel Size)</h3>
55   *
56   * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
57   * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
58   * function is 'dead line of the task'.  The default number of ticks per wheel
59   * (i.e. the size of the wheel) is 512.  You could specify a larger value
60   * if you are going to schedule a lot of timeouts.
61   *
62   * <h3>Do not create many instances.</h3>
63   *
64   * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
65   * started.  Therefore, you should make sure to create only one instance and
66   * share it across your application.  One of the common mistakes, that makes
67   * your application unresponsive, is to create a new instance for every connection.
68   *
69   * <h3>Implementation Details</h3>
70   *
71   * {@link HashedWheelTimer} is based on
72   * <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
73   * Tony Lauck's paper,
74   * <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
75   * and Hierarchical Timing Wheels: data structures to efficiently implement a
76   * timer facility'</a>.  More comprehensive slides are located
77   * <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
78   */
79  public class HashedWheelTimer implements Timer {
80  
81      static final InternalLogger logger =
82              InternalLoggerFactory.getInstance(HashedWheelTimer.class);
83  
84      private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
85      private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
86      private static final int INSTANCE_COUNT_LIMIT = 64;
87      private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
88              .newResourceLeakDetector(HashedWheelTimer.class, 1);
89  
90      private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
91              AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
92  
93      private final ResourceLeakTracker<HashedWheelTimer> leak;
94      private final Worker worker = new Worker();
95      private final Thread workerThread;
96  
97      public static final int WORKER_STATE_INIT = 0;
98      public static final int WORKER_STATE_STARTED = 1;
99      public static final int WORKER_STATE_SHUTDOWN = 2;
100     @SuppressWarnings({ "unused", "FieldMayBeFinal" })
101     private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
102 
103     private final long tickDuration;
104     private final HashedWheelBucket[] wheel;
105     private final int mask;
106     private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
107     private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
108     private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
109     private final AtomicLong pendingTimeouts = new AtomicLong(0);
110     private final long maxPendingTimeouts;
111 
112     private volatile long startTime;
113 
114     /**
115      * Creates a new timer with the default thread factory
116      * ({@link Executors#defaultThreadFactory()}), default tick duration, and
117      * default number of ticks per wheel.
118      */
119     public HashedWheelTimer() {
120         this(Executors.defaultThreadFactory());
121     }
122 
123     /**
124      * Creates a new timer with the default thread factory
125      * ({@link Executors#defaultThreadFactory()}) and default number of ticks
126      * per wheel.
127      *
128      * @param tickDuration   the duration between tick
129      * @param unit           the time unit of the {@code tickDuration}
130      * @throws NullPointerException     if {@code unit} is {@code null}
131      * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
132      */
133     public HashedWheelTimer(long tickDuration, TimeUnit unit) {
134         this(Executors.defaultThreadFactory(), tickDuration, unit);
135     }
136 
137     /**
138      * Creates a new timer with the default thread factory
139      * ({@link Executors#defaultThreadFactory()}).
140      *
141      * @param tickDuration   the duration between tick
142      * @param unit           the time unit of the {@code tickDuration}
143      * @param ticksPerWheel  the size of the wheel
144      * @throws NullPointerException     if {@code unit} is {@code null}
145      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
146      */
147     public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
148         this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
149     }
150 
151     /**
152      * Creates a new timer with the default tick duration and default number of
153      * ticks per wheel.
154      *
155      * @param threadFactory  a {@link ThreadFactory} that creates a
156      *                       background {@link Thread} which is dedicated to
157      *                       {@link TimerTask} execution.
158      * @throws NullPointerException if {@code threadFactory} is {@code null}
159      */
160     public HashedWheelTimer(ThreadFactory threadFactory) {
161         this(threadFactory, 100, TimeUnit.MILLISECONDS);
162     }
163 
164     /**
165      * Creates a new timer with the default number of ticks per wheel.
166      *
167      * @param threadFactory  a {@link ThreadFactory} that creates a
168      *                       background {@link Thread} which is dedicated to
169      *                       {@link TimerTask} execution.
170      * @param tickDuration   the duration between tick
171      * @param unit           the time unit of the {@code tickDuration}
172      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
173      * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
174      */
175     public HashedWheelTimer(
176             ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
177         this(threadFactory, tickDuration, unit, 512);
178     }
179 
180     /**
181      * Creates a new timer.
182      *
183      * @param threadFactory  a {@link ThreadFactory} that creates a
184      *                       background {@link Thread} which is dedicated to
185      *                       {@link TimerTask} execution.
186      * @param tickDuration   the duration between tick
187      * @param unit           the time unit of the {@code tickDuration}
188      * @param ticksPerWheel  the size of the wheel
189      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
190      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
191      */
192     public HashedWheelTimer(
193             ThreadFactory threadFactory,
194             long tickDuration, TimeUnit unit, int ticksPerWheel) {
195         this(threadFactory, tickDuration, unit, ticksPerWheel, true);
196     }
197 
198     /**
199      * Creates a new timer.
200      *
201      * @param threadFactory        a {@link ThreadFactory} that creates a
202      *                             background {@link Thread} which is dedicated to
203      *                             {@link TimerTask} execution.
204      * @param tickDuration         the duration between tick
205      * @param unit                 the time unit of the {@code tickDuration}
206      * @param ticksPerWheel        the size of the wheel
207      * @param leakDetection        {@code true} if leak detection should be enabled always,
208      *                             if false it will only be enabled if the worker thread is not
209      *                             a daemon thread.
210      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
211      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
212      */
213     public HashedWheelTimer(
214         ThreadFactory threadFactory,
215         long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
216         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
217     }
218 
219     /**
220      * Creates a new timer.
221      *
222      * @param threadFactory        a {@link ThreadFactory} that creates a
223      *                             background {@link Thread} which is dedicated to
224      *                             {@link TimerTask} execution.
225      * @param tickDuration         the duration between tick
226      * @param unit                 the time unit of the {@code tickDuration}
227      * @param ticksPerWheel        the size of the wheel
228      * @param leakDetection        {@code true} if leak detection should be enabled always,
229      *                             if false it will only be enabled if the worker thread is not
230      *                             a daemon thread.
231      * @param  maxPendingTimeouts  The maximum number of pending timeouts after which call to
232      *                             {@code newTimeout} will result in
233      *                             {@link java.util.concurrent.RejectedExecutionException}
234      *                             being thrown. No maximum pending timeouts limit is assumed if
235      *                             this value is 0 or negative.
236      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
237      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
238      */
239     public HashedWheelTimer(
240             ThreadFactory threadFactory,
241             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
242             long maxPendingTimeouts) {
243 
244         if (threadFactory == null) {
245             throw new NullPointerException("threadFactory");
246         }
247         if (unit == null) {
248             throw new NullPointerException("unit");
249         }
250         if (tickDuration <= 0) {
251             throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
252         }
253         if (ticksPerWheel <= 0) {
254             throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
255         }
256 
257         // Normalize ticksPerWheel to power of two and initialize the wheel.
258         wheel = createWheel(ticksPerWheel);
259         mask = wheel.length - 1;
260 
261         // Convert tickDuration to nanos.
262         this.tickDuration = unit.toNanos(tickDuration);
263 
264         // Prevent overflow.
265         if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
266             throw new IllegalArgumentException(String.format(
267                     "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
268                     tickDuration, Long.MAX_VALUE / wheel.length));
269         }
270         workerThread = threadFactory.newThread(worker);
271 
272         leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
273 
274         this.maxPendingTimeouts = maxPendingTimeouts;
275 
276         if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
277             WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
278             reportTooManyInstances();
279         }
280     }
281 
282     @Override
283     protected void finalize() throws Throwable {
284         try {
285             super.finalize();
286         } finally {
287             // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
288             // we have not yet shutdown then we want to make sure we decrement the active instance count.
289             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
290                 INSTANCE_COUNTER.decrementAndGet();
291             }
292         }
293     }
294 
295     private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
296         if (ticksPerWheel <= 0) {
297             throw new IllegalArgumentException(
298                     "ticksPerWheel must be greater than 0: " + ticksPerWheel);
299         }
300         if (ticksPerWheel > 1073741824) {
301             throw new IllegalArgumentException(
302                     "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
303         }
304 
305         ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
306         HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
307         for (int i = 0; i < wheel.length; i ++) {
308             wheel[i] = new HashedWheelBucket();
309         }
310         return wheel;
311     }
312 
313     private static int normalizeTicksPerWheel(int ticksPerWheel) {
314         int normalizedTicksPerWheel = 1;
315         while (normalizedTicksPerWheel < ticksPerWheel) {
316             normalizedTicksPerWheel <<= 1;
317         }
318         return normalizedTicksPerWheel;
319     }
320 
321     /**
322      * Starts the background thread explicitly.  The background thread will
323      * start automatically on demand even if you did not call this method.
324      *
325      * @throws IllegalStateException if this timer has been
326      *                               {@linkplain #stop() stopped} already
327      */
328     public void start() {
329         switch (WORKER_STATE_UPDATER.get(this)) {
330             case WORKER_STATE_INIT:
331                 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
332                     workerThread.start();
333                 }
334                 break;
335             case WORKER_STATE_STARTED:
336                 break;
337             case WORKER_STATE_SHUTDOWN:
338                 throw new IllegalStateException("cannot be started once stopped");
339             default:
340                 throw new Error("Invalid WorkerState");
341         }
342 
343         // Wait until the startTime is initialized by the worker.
344         while (startTime == 0) {
345             try {
346                 startTimeInitialized.await();
347             } catch (InterruptedException ignore) {
348                 // Ignore - it will be ready very soon.
349             }
350         }
351     }
352 
353     @Override
354     public Set<Timeout> stop() {
355         if (Thread.currentThread() == workerThread) {
356             throw new IllegalStateException(
357                     HashedWheelTimer.class.getSimpleName() +
358                             ".stop() cannot be called from " +
359                             TimerTask.class.getSimpleName());
360         }
361 
362         if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
363             // workerState can be 0 or 2 at this moment - let it always be 2.
364             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
365                 INSTANCE_COUNTER.decrementAndGet();
366                 if (leak != null) {
367                     boolean closed = leak.close(this);
368                     assert closed;
369                 }
370             }
371 
372             return Collections.emptySet();
373         }
374 
375         try {
376             boolean interrupted = false;
377             while (workerThread.isAlive()) {
378                 workerThread.interrupt();
379                 try {
380                     workerThread.join(100);
381                 } catch (InterruptedException ignored) {
382                     interrupted = true;
383                 }
384             }
385 
386             if (interrupted) {
387                 Thread.currentThread().interrupt();
388             }
389         } finally {
390             INSTANCE_COUNTER.decrementAndGet();
391             if (leak != null) {
392                 boolean closed = leak.close(this);
393                 assert closed;
394             }
395         }
396         return worker.unprocessedTimeouts();
397     }
398 
399     @Override
400     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
401         if (task == null) {
402             throw new NullPointerException("task");
403         }
404         if (unit == null) {
405             throw new NullPointerException("unit");
406         }
407 
408         long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
409 
410         if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
411             pendingTimeouts.decrementAndGet();
412             throw new RejectedExecutionException("Number of pending timeouts ("
413                 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
414                 + "timeouts (" + maxPendingTimeouts + ")");
415         }
416 
417         start();
418 
419         // Add the timeout to the timeout queue which will be processed on the next tick.
420         // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
421         long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
422 
423         // Guard against overflow.
424         if (delay > 0 && deadline < 0) {
425             deadline = Long.MAX_VALUE;
426         }
427         HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
428         timeouts.add(timeout);
429         return timeout;
430     }
431 
432     /**
433      * Returns the number of pending timeouts of this {@link Timer}.
434      */
435     public long pendingTimeouts() {
436         return pendingTimeouts.get();
437     }
438 
439     private static void reportTooManyInstances() {
440         String resourceType = simpleClassName(HashedWheelTimer.class);
441         logger.error("You are creating too many " + resourceType + " instances. " +
442                 resourceType + " is a shared resource that must be reused across the JVM," +
443                 "so that only a few instances are created.");
444     }
445 
446     private final class Worker implements Runnable {
447         private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
448 
449         private long tick;
450 
451         @Override
452         public void run() {
453             // Initialize the startTime.
454             startTime = System.nanoTime();
455             if (startTime == 0) {
456                 // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
457                 startTime = 1;
458             }
459 
460             // Notify the other threads waiting for the initialization at start().
461             startTimeInitialized.countDown();
462 
463             do {
464                 final long deadline = waitForNextTick();
465                 if (deadline > 0) {
466                     int idx = (int) (tick & mask);
467                     processCancelledTasks();
468                     HashedWheelBucket bucket =
469                             wheel[idx];
470                     transferTimeoutsToBuckets();
471                     bucket.expireTimeouts(deadline);
472                     tick++;
473                 }
474             } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
475 
476             // Fill the unprocessedTimeouts so we can return them from stop() method.
477             for (HashedWheelBucket bucket: wheel) {
478                 bucket.clearTimeouts(unprocessedTimeouts);
479             }
480             for (;;) {
481                 HashedWheelTimeout timeout = timeouts.poll();
482                 if (timeout == null) {
483                     break;
484                 }
485                 if (!timeout.isCancelled()) {
486                     unprocessedTimeouts.add(timeout);
487                 }
488             }
489             processCancelledTasks();
490         }
491 
492         private void transferTimeoutsToBuckets() {
493             // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
494             // adds new timeouts in a loop.
495             for (int i = 0; i < 100000; i++) {
496                 HashedWheelTimeout timeout = timeouts.poll();
497                 if (timeout == null) {
498                     // all processed
499                     break;
500                 }
501                 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
502                     // Was cancelled in the meantime.
503                     continue;
504                 }
505 
506                 long calculated = timeout.deadline / tickDuration;
507                 timeout.remainingRounds = (calculated - tick) / wheel.length;
508 
509                 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
510                 int stopIndex = (int) (ticks & mask);
511 
512                 HashedWheelBucket bucket = wheel[stopIndex];
513                 bucket.addTimeout(timeout);
514             }
515         }
516 
517         private void processCancelledTasks() {
518             for (;;) {
519                 HashedWheelTimeout timeout = cancelledTimeouts.poll();
520                 if (timeout == null) {
521                     // all processed
522                     break;
523                 }
524                 try {
525                     timeout.remove();
526                 } catch (Throwable t) {
527                     if (logger.isWarnEnabled()) {
528                         logger.warn("An exception was thrown while process a cancellation task", t);
529                     }
530                 }
531             }
532         }
533 
534         /**
535          * calculate goal nanoTime from startTime and current tick number,
536          * then wait until that goal has been reached.
537          * @return Long.MIN_VALUE if received a shutdown request,
538          * current time otherwise (with Long.MIN_VALUE changed by +1)
539          */
540         private long waitForNextTick() {
541             long deadline = tickDuration * (tick + 1);
542 
543             for (;;) {
544                 final long currentTime = System.nanoTime() - startTime;
545                 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
546 
547                 if (sleepTimeMs <= 0) {
548                     if (currentTime == Long.MIN_VALUE) {
549                         return -Long.MAX_VALUE;
550                     } else {
551                         return currentTime;
552                     }
553                 }
554 
555                 // Check if we run on windows, as if thats the case we will need
556                 // to round the sleepTime as workaround for a bug that only affect
557                 // the JVM if it runs on windows.
558                 //
559                 // See https://github.com/netty/netty/issues/356
560                 if (PlatformDependent.isWindows()) {
561                     sleepTimeMs = sleepTimeMs / 10 * 10;
562                 }
563 
564                 try {
565                     Thread.sleep(sleepTimeMs);
566                 } catch (InterruptedException ignored) {
567                     if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
568                         return Long.MIN_VALUE;
569                     }
570                 }
571             }
572         }
573 
574         public Set<Timeout> unprocessedTimeouts() {
575             return Collections.unmodifiableSet(unprocessedTimeouts);
576         }
577     }
578 
579     private static final class HashedWheelTimeout implements Timeout {
580 
581         private static final int ST_INIT = 0;
582         private static final int ST_CANCELLED = 1;
583         private static final int ST_EXPIRED = 2;
584         private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
585                 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
586 
587         private final HashedWheelTimer timer;
588         private final TimerTask task;
589         private final long deadline;
590 
591         @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
592         private volatile int state = ST_INIT;
593 
594         // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
595         // HashedWheelTimeout will be added to the correct HashedWheelBucket.
596         long remainingRounds;
597 
598         // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
599         // As only the workerThread will act on it there is no need for synchronization / volatile.
600         HashedWheelTimeout next;
601         HashedWheelTimeout prev;
602 
603         // The bucket to which the timeout was added
604         HashedWheelBucket bucket;
605 
606         HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
607             this.timer = timer;
608             this.task = task;
609             this.deadline = deadline;
610         }
611 
612         @Override
613         public Timer timer() {
614             return timer;
615         }
616 
617         @Override
618         public TimerTask task() {
619             return task;
620         }
621 
622         @Override
623         public boolean cancel() {
624             // only update the state it will be removed from HashedWheelBucket on next tick.
625             if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
626                 return false;
627             }
628             // If a task should be canceled we put this to another queue which will be processed on each tick.
629             // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
630             // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
631             timer.cancelledTimeouts.add(this);
632             return true;
633         }
634 
635         void remove() {
636             HashedWheelBucket bucket = this.bucket;
637             if (bucket != null) {
638                 bucket.remove(this);
639             } else {
640                 timer.pendingTimeouts.decrementAndGet();
641             }
642         }
643 
644         public boolean compareAndSetState(int expected, int state) {
645             return STATE_UPDATER.compareAndSet(this, expected, state);
646         }
647 
648         public int state() {
649             return state;
650         }
651 
652         @Override
653         public boolean isCancelled() {
654             return state() == ST_CANCELLED;
655         }
656 
657         @Override
658         public boolean isExpired() {
659             return state() == ST_EXPIRED;
660         }
661 
662         public void expire() {
663             if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
664                 return;
665             }
666 
667             try {
668                 task.run(this);
669             } catch (Throwable t) {
670                 if (logger.isWarnEnabled()) {
671                     logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
672                 }
673             }
674         }
675 
676         @Override
677         public String toString() {
678             final long currentTime = System.nanoTime();
679             long remaining = deadline - currentTime + timer.startTime;
680 
681             StringBuilder buf = new StringBuilder(192)
682                .append(simpleClassName(this))
683                .append('(')
684                .append("deadline: ");
685             if (remaining > 0) {
686                 buf.append(remaining)
687                    .append(" ns later");
688             } else if (remaining < 0) {
689                 buf.append(-remaining)
690                    .append(" ns ago");
691             } else {
692                 buf.append("now");
693             }
694 
695             if (isCancelled()) {
696                 buf.append(", cancelled");
697             }
698 
699             return buf.append(", task: ")
700                       .append(task())
701                       .append(')')
702                       .toString();
703         }
704     }
705 
706     /**
707      * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
708      * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
709      * extra object creation is needed.
710      */
711     private static final class HashedWheelBucket {
712         // Used for the linked-list datastructure
713         private HashedWheelTimeout head;
714         private HashedWheelTimeout tail;
715 
716         /**
717          * Add {@link HashedWheelTimeout} to this bucket.
718          */
719         public void addTimeout(HashedWheelTimeout timeout) {
720             assert timeout.bucket == null;
721             timeout.bucket = this;
722             if (head == null) {
723                 head = tail = timeout;
724             } else {
725                 tail.next = timeout;
726                 timeout.prev = tail;
727                 tail = timeout;
728             }
729         }
730 
731         /**
732          * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
733          */
734         public void expireTimeouts(long deadline) {
735             HashedWheelTimeout timeout = head;
736 
737             // process all timeouts
738             while (timeout != null) {
739                 HashedWheelTimeout next = timeout.next;
740                 if (timeout.remainingRounds <= 0) {
741                     next = remove(timeout);
742                     if (timeout.deadline <= deadline) {
743                         timeout.expire();
744                     } else {
745                         // The timeout was placed into a wrong slot. This should never happen.
746                         throw new IllegalStateException(String.format(
747                                 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
748                     }
749                 } else if (timeout.isCancelled()) {
750                     next = remove(timeout);
751                 } else {
752                     timeout.remainingRounds --;
753                 }
754                 timeout = next;
755             }
756         }
757 
758         public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
759             HashedWheelTimeout next = timeout.next;
760             // remove timeout that was either processed or cancelled by updating the linked-list
761             if (timeout.prev != null) {
762                 timeout.prev.next = next;
763             }
764             if (timeout.next != null) {
765                 timeout.next.prev = timeout.prev;
766             }
767 
768             if (timeout == head) {
769                 // if timeout is also the tail we need to adjust the entry too
770                 if (timeout == tail) {
771                     tail = null;
772                     head = null;
773                 } else {
774                     head = next;
775                 }
776             } else if (timeout == tail) {
777                 // if the timeout is the tail modify the tail to be the prev node.
778                 tail = timeout.prev;
779             }
780             // null out prev, next and bucket to allow for GC.
781             timeout.prev = null;
782             timeout.next = null;
783             timeout.bucket = null;
784             timeout.timer.pendingTimeouts.decrementAndGet();
785             return next;
786         }
787 
788         /**
789          * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
790          */
791         public void clearTimeouts(Set<Timeout> set) {
792             for (;;) {
793                 HashedWheelTimeout timeout = pollTimeout();
794                 if (timeout == null) {
795                     return;
796                 }
797                 if (timeout.isExpired() || timeout.isCancelled()) {
798                     continue;
799                 }
800                 set.add(timeout);
801             }
802         }
803 
804         private HashedWheelTimeout pollTimeout() {
805             HashedWheelTimeout head = this.head;
806             if (head == null) {
807                 return null;
808             }
809             HashedWheelTimeout next = head.next;
810             if (next == null) {
811                 tail = this.head =  null;
812             } else {
813                 this.head = next;
814                 next.prev = null;
815             }
816 
817             // null out prev and next to allow for GC.
818             head.next = null;
819             head.prev = null;
820             head.bucket = null;
821             return head;
822         }
823     }
824 }