View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util;
17  
18  import io.netty.util.internal.PlatformDependent;
19  import io.netty.util.internal.logging.InternalLogger;
20  import io.netty.util.internal.logging.InternalLoggerFactory;
21  
22  import java.util.Collections;
23  import java.util.HashSet;
24  import java.util.Queue;
25  import java.util.Set;
26  import java.util.concurrent.CountDownLatch;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.RejectedExecutionException;
29  import java.util.concurrent.ThreadFactory;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
34  import java.util.concurrent.atomic.AtomicLong;
35  
36  import static io.netty.util.internal.StringUtil.simpleClassName;
37  
38  /**
39   * A {@link Timer} optimized for approximated I/O timeout scheduling.
40   *
41   * <h3>Tick Duration</h3>
42   *
43   * As described with 'approximated', this timer does not execute the scheduled
44   * {@link TimerTask} on time.  {@link HashedWheelTimer}, on every tick, will
45   * check if there are any {@link TimerTask}s behind the schedule and execute
46   * them.
47   * <p>
48   * You can increase or decrease the accuracy of the execution timing by
49   * specifying smaller or larger tick duration in the constructor.  In most
50   * network applications, I/O timeout does not need to be accurate.  Therefore,
51   * the default tick duration is 100 milliseconds and you will not need to try
52   * different configurations in most cases.
53   *
54   * <h3>Ticks per Wheel (Wheel Size)</h3>
55   *
56   * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
57   * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
58   * function is 'dead line of the task'.  The default number of ticks per wheel
59   * (i.e. the size of the wheel) is 512.  You could specify a larger value
60   * if you are going to schedule a lot of timeouts.
61   *
62   * <h3>Do not create many instances.</h3>
63   *
64   * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
65   * started.  Therefore, you should make sure to create only one instance and
66   * share it across your application.  One of the common mistakes, that makes
67   * your application unresponsive, is to create a new instance for every connection.
68   *
69   * <h3>Implementation Details</h3>
70   *
71   * {@link HashedWheelTimer} is based on
72   * <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
73   * Tony Lauck's paper,
74   * <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
75   * and Hierarchical Timing Wheels: data structures to efficiently implement a
76   * timer facility'</a>.  More comprehensive slides are located
77   * <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
78   */
79  public class HashedWheelTimer implements Timer {
80  
81      static final InternalLogger logger =
82              InternalLoggerFactory.getInstance(HashedWheelTimer.class);
83  
84      private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
85      private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
86      private static final int INSTANCE_COUNT_LIMIT = 64;
87      private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
88              .newResourceLeakDetector(HashedWheelTimer.class, 1);
89  
90      private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
91              AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
92  
93      private final ResourceLeakTracker<HashedWheelTimer> leak;
94      private final Worker worker = new Worker();
95      private final Thread workerThread;
96  
97      public static final int WORKER_STATE_INIT = 0;
98      public static final int WORKER_STATE_STARTED = 1;
99      public static final int WORKER_STATE_SHUTDOWN = 2;
100     @SuppressWarnings({ "unused", "FieldMayBeFinal" })
101     private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
102 
103     private final long tickDuration;
104     private final HashedWheelBucket[] wheel;
105     private final int mask;
106     private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
107     private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
108     private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
109     private final AtomicLong pendingTimeouts = new AtomicLong(0);
110     private final long maxPendingTimeouts;
111 
112     private volatile long startTime;
113 
114     /**
115      * Creates a new timer with the default thread factory
116      * ({@link Executors#defaultThreadFactory()}), default tick duration, and
117      * default number of ticks per wheel.
118      */
119     public HashedWheelTimer() {
120         this(Executors.defaultThreadFactory());
121     }
122 
123     /**
124      * Creates a new timer with the default thread factory
125      * ({@link Executors#defaultThreadFactory()}) and default number of ticks
126      * per wheel.
127      *
128      * @param tickDuration   the duration between tick
129      * @param unit           the time unit of the {@code tickDuration}
130      * @throws NullPointerException     if {@code unit} is {@code null}
131      * @throws IllegalArgumentException if {@code tickDuration} is <= 0
132      */
133     public HashedWheelTimer(long tickDuration, TimeUnit unit) {
134         this(Executors.defaultThreadFactory(), tickDuration, unit);
135     }
136 
137     /**
138      * Creates a new timer with the default thread factory
139      * ({@link Executors#defaultThreadFactory()}).
140      *
141      * @param tickDuration   the duration between tick
142      * @param unit           the time unit of the {@code tickDuration}
143      * @param ticksPerWheel  the size of the wheel
144      * @throws NullPointerException     if {@code unit} is {@code null}
145      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
146      */
147     public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
148         this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
149     }
150 
151     /**
152      * Creates a new timer with the default tick duration and default number of
153      * ticks per wheel.
154      *
155      * @param threadFactory  a {@link ThreadFactory} that creates a
156      *                       background {@link Thread} which is dedicated to
157      *                       {@link TimerTask} execution.
158      * @throws NullPointerException if {@code threadFactory} is {@code null}
159      */
160     public HashedWheelTimer(ThreadFactory threadFactory) {
161         this(threadFactory, 100, TimeUnit.MILLISECONDS);
162     }
163 
164     /**
165      * Creates a new timer with the default number of ticks per wheel.
166      *
167      * @param threadFactory  a {@link ThreadFactory} that creates a
168      *                       background {@link Thread} which is dedicated to
169      *                       {@link TimerTask} execution.
170      * @param tickDuration   the duration between tick
171      * @param unit           the time unit of the {@code tickDuration}
172      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
173      * @throws IllegalArgumentException if {@code tickDuration} is <= 0
174      */
175     public HashedWheelTimer(
176             ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
177         this(threadFactory, tickDuration, unit, 512);
178     }
179 
180     /**
181      * Creates a new timer.
182      *
183      * @param threadFactory  a {@link ThreadFactory} that creates a
184      *                       background {@link Thread} which is dedicated to
185      *                       {@link TimerTask} execution.
186      * @param tickDuration   the duration between tick
187      * @param unit           the time unit of the {@code tickDuration}
188      * @param ticksPerWheel  the size of the wheel
189      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
190      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
191      */
192     public HashedWheelTimer(
193             ThreadFactory threadFactory,
194             long tickDuration, TimeUnit unit, int ticksPerWheel) {
195         this(threadFactory, tickDuration, unit, ticksPerWheel, true);
196     }
197 
198     /**
199      * Creates a new timer.
200      *
201      * @param threadFactory        a {@link ThreadFactory} that creates a
202      *                             background {@link Thread} which is dedicated to
203      *                             {@link TimerTask} execution.
204      * @param tickDuration         the duration between tick
205      * @param unit                 the time unit of the {@code tickDuration}
206      * @param ticksPerWheel        the size of the wheel
207      * @param leakDetection        {@code true} if leak detection should be enabled always,
208      *                             if false it will only be enabled if the worker thread is not
209      *                             a daemon thread.
210      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
211      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
212      */
213     public HashedWheelTimer(
214         ThreadFactory threadFactory,
215         long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
216         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
217     }
218 
219     /**
220      * Creates a new timer.
221      *
222      * @param threadFactory        a {@link ThreadFactory} that creates a
223      *                             background {@link Thread} which is dedicated to
224      *                             {@link TimerTask} execution.
225      * @param tickDuration         the duration between tick
226      * @param unit                 the time unit of the {@code tickDuration}
227      * @param ticksPerWheel        the size of the wheel
228      * @param leakDetection        {@code true} if leak detection should be enabled always,
229      *                             if false it will only be enabled if the worker thread is not
230      *                             a daemon thread.
231      * @param  maxPendingTimeouts  The maximum number of pending timeouts after which call to
232      *                             {@code newTimeout} will result in
233      *                             {@link java.util.concurrent.RejectedExecutionException}
234      *                             being thrown. No maximum pending timeouts limit is assumed if
235      *                             this value is 0 or negative.
236      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
237      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
238      */
239     public HashedWheelTimer(
240             ThreadFactory threadFactory,
241             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
242             long maxPendingTimeouts) {
243 
244         if (threadFactory == null) {
245             throw new NullPointerException("threadFactory");
246         }
247         if (unit == null) {
248             throw new NullPointerException("unit");
249         }
250         if (tickDuration <= 0) {
251             throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
252         }
253         if (ticksPerWheel <= 0) {
254             throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
255         }
256 
257         // Normalize ticksPerWheel to power of two and initialize the wheel.
258         wheel = createWheel(ticksPerWheel);
259         mask = wheel.length - 1;
260 
261         // Convert tickDuration to nanos.
262         this.tickDuration = unit.toNanos(tickDuration);
263 
264         // Prevent overflow.
265         if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
266             throw new IllegalArgumentException(String.format(
267                     "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
268                     tickDuration, Long.MAX_VALUE / wheel.length));
269         }
270         workerThread = threadFactory.newThread(worker);
271 
272         leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
273 
274         this.maxPendingTimeouts = maxPendingTimeouts;
275 
276         if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
277             WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
278             reportTooManyInstances();
279         }
280     }
281 
282     @Override
283     protected void finalize() throws Throwable {
284         try {
285             super.finalize();
286         } finally {
287             // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
288             // we have not yet shutdown then we want to make sure we decrement the active instance count.
289             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
290                 INSTANCE_COUNTER.decrementAndGet();
291             }
292         }
293     }
294 
295     private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
296         if (ticksPerWheel <= 0) {
297             throw new IllegalArgumentException(
298                     "ticksPerWheel must be greater than 0: " + ticksPerWheel);
299         }
300         if (ticksPerWheel > 1073741824) {
301             throw new IllegalArgumentException(
302                     "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
303         }
304 
305         ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
306         HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
307         for (int i = 0; i < wheel.length; i ++) {
308             wheel[i] = new HashedWheelBucket();
309         }
310         return wheel;
311     }
312 
313     private static int normalizeTicksPerWheel(int ticksPerWheel) {
314         int normalizedTicksPerWheel = 1;
315         while (normalizedTicksPerWheel < ticksPerWheel) {
316             normalizedTicksPerWheel <<= 1;
317         }
318         return normalizedTicksPerWheel;
319     }
320 
321     /**
322      * Starts the background thread explicitly.  The background thread will
323      * start automatically on demand even if you did not call this method.
324      *
325      * @throws IllegalStateException if this timer has been
326      *                               {@linkplain #stop() stopped} already
327      */
328     public void start() {
329         switch (WORKER_STATE_UPDATER.get(this)) {
330             case WORKER_STATE_INIT:
331                 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
332                     workerThread.start();
333                 }
334                 break;
335             case WORKER_STATE_STARTED:
336                 break;
337             case WORKER_STATE_SHUTDOWN:
338                 throw new IllegalStateException("cannot be started once stopped");
339             default:
340                 throw new Error("Invalid WorkerState");
341         }
342 
343         // Wait until the startTime is initialized by the worker.
344         while (startTime == 0) {
345             try {
346                 startTimeInitialized.await();
347             } catch (InterruptedException ignore) {
348                 // Ignore - it will be ready very soon.
349             }
350         }
351     }
352 
353     @Override
354     public Set<Timeout> stop() {
355         if (Thread.currentThread() == workerThread) {
356             throw new IllegalStateException(
357                     HashedWheelTimer.class.getSimpleName() +
358                             ".stop() cannot be called from " +
359                             TimerTask.class.getSimpleName());
360         }
361 
362         if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
363             // workerState can be 0 or 2 at this moment - let it always be 2.
364             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
365                 INSTANCE_COUNTER.decrementAndGet();
366                 if (leak != null) {
367                     boolean closed = leak.close(this);
368                     assert closed;
369                 }
370             }
371 
372             return Collections.emptySet();
373         }
374 
375         try {
376             boolean interrupted = false;
377             while (workerThread.isAlive()) {
378                 workerThread.interrupt();
379                 try {
380                     workerThread.join(100);
381                 } catch (InterruptedException ignored) {
382                     interrupted = true;
383                 }
384             }
385 
386             if (interrupted) {
387                 Thread.currentThread().interrupt();
388             }
389         } finally {
390             INSTANCE_COUNTER.decrementAndGet();
391             if (leak != null) {
392                 boolean closed = leak.close(this);
393                 assert closed;
394             }
395         }
396         return worker.unprocessedTimeouts();
397     }
398 
399     @Override
400     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
401         if (task == null) {
402             throw new NullPointerException("task");
403         }
404         if (unit == null) {
405             throw new NullPointerException("unit");
406         }
407 
408         long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
409 
410         if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
411             pendingTimeouts.decrementAndGet();
412             throw new RejectedExecutionException("Number of pending timeouts ("
413                 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
414                 + "timeouts (" + maxPendingTimeouts + ")");
415         }
416 
417         start();
418 
419         // Add the timeout to the timeout queue which will be processed on the next tick.
420         // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
421         long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
422         HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
423         timeouts.add(timeout);
424         return timeout;
425     }
426 
427     /**
428      * Returns the number of pending timeouts of this {@link Timer}.
429      */
430     public long pendingTimeouts() {
431         return pendingTimeouts.get();
432     }
433 
434     private static void reportTooManyInstances() {
435         String resourceType = simpleClassName(HashedWheelTimer.class);
436         logger.error("You are creating too many " + resourceType + " instances. " +
437                 resourceType + " is a shared resource that must be reused across the JVM," +
438                 "so that only a few instances are created.");
439     }
440 
441     private final class Worker implements Runnable {
442         private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
443 
444         private long tick;
445 
446         @Override
447         public void run() {
448             // Initialize the startTime.
449             startTime = System.nanoTime();
450             if (startTime == 0) {
451                 // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
452                 startTime = 1;
453             }
454 
455             // Notify the other threads waiting for the initialization at start().
456             startTimeInitialized.countDown();
457 
458             do {
459                 final long deadline = waitForNextTick();
460                 if (deadline > 0) {
461                     int idx = (int) (tick & mask);
462                     processCancelledTasks();
463                     HashedWheelBucket bucket =
464                             wheel[idx];
465                     transferTimeoutsToBuckets();
466                     bucket.expireTimeouts(deadline);
467                     tick++;
468                 }
469             } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
470 
471             // Fill the unprocessedTimeouts so we can return them from stop() method.
472             for (HashedWheelBucket bucket: wheel) {
473                 bucket.clearTimeouts(unprocessedTimeouts);
474             }
475             for (;;) {
476                 HashedWheelTimeout timeout = timeouts.poll();
477                 if (timeout == null) {
478                     break;
479                 }
480                 if (!timeout.isCancelled()) {
481                     unprocessedTimeouts.add(timeout);
482                 }
483             }
484             processCancelledTasks();
485         }
486 
487         private void transferTimeoutsToBuckets() {
488             // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
489             // adds new timeouts in a loop.
490             for (int i = 0; i < 100000; i++) {
491                 HashedWheelTimeout timeout = timeouts.poll();
492                 if (timeout == null) {
493                     // all processed
494                     break;
495                 }
496                 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
497                     // Was cancelled in the meantime.
498                     continue;
499                 }
500 
501                 long calculated = timeout.deadline / tickDuration;
502                 timeout.remainingRounds = (calculated - tick) / wheel.length;
503 
504                 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
505                 int stopIndex = (int) (ticks & mask);
506 
507                 HashedWheelBucket bucket = wheel[stopIndex];
508                 bucket.addTimeout(timeout);
509             }
510         }
511 
512         private void processCancelledTasks() {
513             for (;;) {
514                 HashedWheelTimeout timeout = cancelledTimeouts.poll();
515                 if (timeout == null) {
516                     // all processed
517                     break;
518                 }
519                 try {
520                     timeout.remove();
521                 } catch (Throwable t) {
522                     if (logger.isWarnEnabled()) {
523                         logger.warn("An exception was thrown while process a cancellation task", t);
524                     }
525                 }
526             }
527         }
528 
529         /**
530          * calculate goal nanoTime from startTime and current tick number,
531          * then wait until that goal has been reached.
532          * @return Long.MIN_VALUE if received a shutdown request,
533          * current time otherwise (with Long.MIN_VALUE changed by +1)
534          */
535         private long waitForNextTick() {
536             long deadline = tickDuration * (tick + 1);
537 
538             for (;;) {
539                 final long currentTime = System.nanoTime() - startTime;
540                 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
541 
542                 if (sleepTimeMs <= 0) {
543                     if (currentTime == Long.MIN_VALUE) {
544                         return -Long.MAX_VALUE;
545                     } else {
546                         return currentTime;
547                     }
548                 }
549 
550                 // Check if we run on windows, as if thats the case we will need
551                 // to round the sleepTime as workaround for a bug that only affect
552                 // the JVM if it runs on windows.
553                 //
554                 // See https://github.com/netty/netty/issues/356
555                 if (PlatformDependent.isWindows()) {
556                     sleepTimeMs = sleepTimeMs / 10 * 10;
557                 }
558 
559                 try {
560                     Thread.sleep(sleepTimeMs);
561                 } catch (InterruptedException ignored) {
562                     if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
563                         return Long.MIN_VALUE;
564                     }
565                 }
566             }
567         }
568 
569         public Set<Timeout> unprocessedTimeouts() {
570             return Collections.unmodifiableSet(unprocessedTimeouts);
571         }
572     }
573 
574     private static final class HashedWheelTimeout implements Timeout {
575 
576         private static final int ST_INIT = 0;
577         private static final int ST_CANCELLED = 1;
578         private static final int ST_EXPIRED = 2;
579         private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
580                 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
581 
582         private final HashedWheelTimer timer;
583         private final TimerTask task;
584         private final long deadline;
585 
586         @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
587         private volatile int state = ST_INIT;
588 
589         // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
590         // HashedWheelTimeout will be added to the correct HashedWheelBucket.
591         long remainingRounds;
592 
593         // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
594         // As only the workerThread will act on it there is no need for synchronization / volatile.
595         HashedWheelTimeout next;
596         HashedWheelTimeout prev;
597 
598         // The bucket to which the timeout was added
599         HashedWheelBucket bucket;
600 
601         HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
602             this.timer = timer;
603             this.task = task;
604             this.deadline = deadline;
605         }
606 
607         @Override
608         public Timer timer() {
609             return timer;
610         }
611 
612         @Override
613         public TimerTask task() {
614             return task;
615         }
616 
617         @Override
618         public boolean cancel() {
619             // only update the state it will be removed from HashedWheelBucket on next tick.
620             if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
621                 return false;
622             }
623             // If a task should be canceled we put this to another queue which will be processed on each tick.
624             // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
625             // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
626             timer.cancelledTimeouts.add(this);
627             return true;
628         }
629 
630         void remove() {
631             HashedWheelBucket bucket = this.bucket;
632             if (bucket != null) {
633                 bucket.remove(this);
634             } else {
635                 timer.pendingTimeouts.decrementAndGet();
636             }
637         }
638 
639         public boolean compareAndSetState(int expected, int state) {
640             return STATE_UPDATER.compareAndSet(this, expected, state);
641         }
642 
643         public int state() {
644             return state;
645         }
646 
647         @Override
648         public boolean isCancelled() {
649             return state() == ST_CANCELLED;
650         }
651 
652         @Override
653         public boolean isExpired() {
654             return state() == ST_EXPIRED;
655         }
656 
657         public void expire() {
658             if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
659                 return;
660             }
661 
662             try {
663                 task.run(this);
664             } catch (Throwable t) {
665                 if (logger.isWarnEnabled()) {
666                     logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
667                 }
668             }
669         }
670 
671         @Override
672         public String toString() {
673             final long currentTime = System.nanoTime();
674             long remaining = deadline - currentTime + timer.startTime;
675 
676             StringBuilder buf = new StringBuilder(192)
677                .append(simpleClassName(this))
678                .append('(')
679                .append("deadline: ");
680             if (remaining > 0) {
681                 buf.append(remaining)
682                    .append(" ns later");
683             } else if (remaining < 0) {
684                 buf.append(-remaining)
685                    .append(" ns ago");
686             } else {
687                 buf.append("now");
688             }
689 
690             if (isCancelled()) {
691                 buf.append(", cancelled");
692             }
693 
694             return buf.append(", task: ")
695                       .append(task())
696                       .append(')')
697                       .toString();
698         }
699     }
700 
701     /**
702      * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
703      * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
704      * extra object creation is needed.
705      */
706     private static final class HashedWheelBucket {
707         // Used for the linked-list datastructure
708         private HashedWheelTimeout head;
709         private HashedWheelTimeout tail;
710 
711         /**
712          * Add {@link HashedWheelTimeout} to this bucket.
713          */
714         public void addTimeout(HashedWheelTimeout timeout) {
715             assert timeout.bucket == null;
716             timeout.bucket = this;
717             if (head == null) {
718                 head = tail = timeout;
719             } else {
720                 tail.next = timeout;
721                 timeout.prev = tail;
722                 tail = timeout;
723             }
724         }
725 
726         /**
727          * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
728          */
729         public void expireTimeouts(long deadline) {
730             HashedWheelTimeout timeout = head;
731 
732             // process all timeouts
733             while (timeout != null) {
734                 HashedWheelTimeout next = timeout.next;
735                 if (timeout.remainingRounds <= 0) {
736                     next = remove(timeout);
737                     if (timeout.deadline <= deadline) {
738                         timeout.expire();
739                     } else {
740                         // The timeout was placed into a wrong slot. This should never happen.
741                         throw new IllegalStateException(String.format(
742                                 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
743                     }
744                 } else if (timeout.isCancelled()) {
745                     next = remove(timeout);
746                 } else {
747                     timeout.remainingRounds --;
748                 }
749                 timeout = next;
750             }
751         }
752 
753         public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
754             HashedWheelTimeout next = timeout.next;
755             // remove timeout that was either processed or cancelled by updating the linked-list
756             if (timeout.prev != null) {
757                 timeout.prev.next = next;
758             }
759             if (timeout.next != null) {
760                 timeout.next.prev = timeout.prev;
761             }
762 
763             if (timeout == head) {
764                 // if timeout is also the tail we need to adjust the entry too
765                 if (timeout == tail) {
766                     tail = null;
767                     head = null;
768                 } else {
769                     head = next;
770                 }
771             } else if (timeout == tail) {
772                 // if the timeout is the tail modify the tail to be the prev node.
773                 tail = timeout.prev;
774             }
775             // null out prev, next and bucket to allow for GC.
776             timeout.prev = null;
777             timeout.next = null;
778             timeout.bucket = null;
779             timeout.timer.pendingTimeouts.decrementAndGet();
780             return next;
781         }
782 
783         /**
784          * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
785          */
786         public void clearTimeouts(Set<Timeout> set) {
787             for (;;) {
788                 HashedWheelTimeout timeout = pollTimeout();
789                 if (timeout == null) {
790                     return;
791                 }
792                 if (timeout.isExpired() || timeout.isCancelled()) {
793                     continue;
794                 }
795                 set.add(timeout);
796             }
797         }
798 
799         private HashedWheelTimeout pollTimeout() {
800             HashedWheelTimeout head = this.head;
801             if (head == null) {
802                 return null;
803             }
804             HashedWheelTimeout next = head.next;
805             if (next == null) {
806                 tail = this.head =  null;
807             } else {
808                 this.head = next;
809                 next.prev = null;
810             }
811 
812             // null out prev and next to allow for GC.
813             head.next = null;
814             head.prev = null;
815             head.bucket = null;
816             return head;
817         }
818     }
819 }