View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util;
17  
18  import static io.netty.util.internal.ObjectUtil.checkInRange;
19  import static io.netty.util.internal.ObjectUtil.checkPositive;
20  import static io.netty.util.internal.ObjectUtil.checkNotNull;
21  
22  import io.netty.util.concurrent.ImmediateExecutor;
23  import io.netty.util.internal.MathUtil;
24  import io.netty.util.internal.PlatformDependent;
25  import io.netty.util.internal.logging.InternalLogger;
26  import io.netty.util.internal.logging.InternalLoggerFactory;
27  
28  import java.util.Collections;
29  import java.util.HashSet;
30  import java.util.Queue;
31  import java.util.Set;
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.Executor;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.RejectedExecutionException;
36  import java.util.concurrent.ThreadFactory;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
41  import java.util.concurrent.atomic.AtomicLong;
42  
43  import static io.netty.util.internal.StringUtil.simpleClassName;
44  
45  /**
46   * A {@link Timer} optimized for approximated I/O timeout scheduling.
47   *
48   * <h3>Tick Duration</h3>
49   *
50   * As described with 'approximated', this timer does not execute the scheduled
51   * {@link TimerTask} on time.  {@link HashedWheelTimer}, on every tick, will
52   * check if there are any {@link TimerTask}s behind the schedule and execute
53   * them.
54   * <p>
55   * You can increase or decrease the accuracy of the execution timing by
56   * specifying smaller or larger tick duration in the constructor.  In most
57   * network applications, I/O timeout does not need to be accurate.  Therefore,
58   * the default tick duration is 100 milliseconds and you will not need to try
59   * different configurations in most cases.
60   *
61   * <h3>Ticks per Wheel (Wheel Size)</h3>
62   *
63   * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
64   * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
65   * function is 'dead line of the task'.  The default number of ticks per wheel
66   * (i.e. the size of the wheel) is 512.  You could specify a larger value
67   * if you are going to schedule a lot of timeouts.
68   *
69   * <h3>Do not create many instances.</h3>
70   *
71   * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
72   * started.  Therefore, you should make sure to create only one instance and
73   * share it across your application.  One of the common mistakes, that makes
74   * your application unresponsive, is to create a new instance for every connection.
75   *
76   * <h3>Implementation Details</h3>
77   *
78   * {@link HashedWheelTimer} is based on
79   * <a href="https://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
80   * Tony Lauck's paper,
81   * <a href="https://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
82   * and Hierarchical Timing Wheels: data structures to efficiently implement a
83   * timer facility'</a>.  More comprehensive slides are located
84   * <a href="https://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
85   */
86  public class HashedWheelTimer implements Timer {
87  
88      static final InternalLogger logger =
89              InternalLoggerFactory.getInstance(HashedWheelTimer.class);
90  
91      private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
92      private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
93      private static final int INSTANCE_COUNT_LIMIT = 64;
94      private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
95      private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
96              .newResourceLeakDetector(HashedWheelTimer.class, 1);
97  
98      private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
99              AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
100 
101     private final ResourceLeakTracker<HashedWheelTimer> leak;
102     private final Worker worker = new Worker();
103     private final Thread workerThread;
104 
105     public static final int WORKER_STATE_INIT = 0;
106     public static final int WORKER_STATE_STARTED = 1;
107     public static final int WORKER_STATE_SHUTDOWN = 2;
108     @SuppressWarnings({"unused", "FieldMayBeFinal"})
109     private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
110 
111     private final long tickDuration;
112     private final HashedWheelBucket[] wheel;
113     private final int mask;
114     private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
115     private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
116     private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
117     private final AtomicLong pendingTimeouts = new AtomicLong(0);
118     private final long maxPendingTimeouts;
119     private final Executor taskExecutor;
120 
121     private volatile long startTime;
122 
123     /**
124      * Creates a new timer with the default thread factory
125      * ({@link Executors#defaultThreadFactory()}), default tick duration, and
126      * default number of ticks per wheel.
127      */
128     public HashedWheelTimer() {
129         this(Executors.defaultThreadFactory());
130     }
131 
132     /**
133      * Creates a new timer with the default thread factory
134      * ({@link Executors#defaultThreadFactory()}) and default number of ticks
135      * per wheel.
136      *
137      * @param tickDuration the duration between tick
138      * @param unit         the time unit of the {@code tickDuration}
139      * @throws NullPointerException     if {@code unit} is {@code null}
140      * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
141      */
142     public HashedWheelTimer(long tickDuration, TimeUnit unit) {
143         this(Executors.defaultThreadFactory(), tickDuration, unit);
144     }
145 
146     /**
147      * Creates a new timer with the default thread factory
148      * ({@link Executors#defaultThreadFactory()}).
149      *
150      * @param tickDuration  the duration between tick
151      * @param unit          the time unit of the {@code tickDuration}
152      * @param ticksPerWheel the size of the wheel
153      * @throws NullPointerException     if {@code unit} is {@code null}
154      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
155      */
156     public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
157         this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
158     }
159 
160     /**
161      * Creates a new timer with the default tick duration and default number of
162      * ticks per wheel.
163      *
164      * @param threadFactory a {@link ThreadFactory} that creates a
165      *                      background {@link Thread} which is dedicated to
166      *                      {@link TimerTask} execution.
167      * @throws NullPointerException if {@code threadFactory} is {@code null}
168      */
169     public HashedWheelTimer(ThreadFactory threadFactory) {
170         this(threadFactory, 100, TimeUnit.MILLISECONDS);
171     }
172 
173     /**
174      * Creates a new timer with the default number of ticks per wheel.
175      *
176      * @param threadFactory a {@link ThreadFactory} that creates a
177      *                      background {@link Thread} which is dedicated to
178      *                      {@link TimerTask} execution.
179      * @param tickDuration  the duration between tick
180      * @param unit          the time unit of the {@code tickDuration}
181      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
182      * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
183      */
184     public HashedWheelTimer(
185             ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
186         this(threadFactory, tickDuration, unit, 512);
187     }
188 
189     /**
190      * Creates a new timer.
191      *
192      * @param threadFactory a {@link ThreadFactory} that creates a
193      *                      background {@link Thread} which is dedicated to
194      *                      {@link TimerTask} execution.
195      * @param tickDuration  the duration between tick
196      * @param unit          the time unit of the {@code tickDuration}
197      * @param ticksPerWheel the size of the wheel
198      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
199      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
200      */
201     public HashedWheelTimer(
202             ThreadFactory threadFactory,
203             long tickDuration, TimeUnit unit, int ticksPerWheel) {
204         this(threadFactory, tickDuration, unit, ticksPerWheel, true);
205     }
206 
207     /**
208      * Creates a new timer.
209      *
210      * @param threadFactory a {@link ThreadFactory} that creates a
211      *                      background {@link Thread} which is dedicated to
212      *                      {@link TimerTask} execution.
213      * @param tickDuration  the duration between tick
214      * @param unit          the time unit of the {@code tickDuration}
215      * @param ticksPerWheel the size of the wheel
216      * @param leakDetection {@code true} if leak detection should be enabled always,
217      *                      if false it will only be enabled if the worker thread is not
218      *                      a daemon thread.
219      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
220      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
221      */
222     public HashedWheelTimer(
223             ThreadFactory threadFactory,
224             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
225         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
226     }
227 
228     /**
229      * Creates a new timer.
230      *
231      * @param threadFactory        a {@link ThreadFactory} that creates a
232      *                             background {@link Thread} which is dedicated to
233      *                             {@link TimerTask} execution.
234      * @param tickDuration         the duration between tick
235      * @param unit                 the time unit of the {@code tickDuration}
236      * @param ticksPerWheel        the size of the wheel
237      * @param leakDetection        {@code true} if leak detection should be enabled always,
238      *                             if false it will only be enabled if the worker thread is not
239      *                             a daemon thread.
240      * @param  maxPendingTimeouts  The maximum number of pending timeouts after which call to
241      *                             {@code newTimeout} will result in
242      *                             {@link java.util.concurrent.RejectedExecutionException}
243      *                             being thrown. No maximum pending timeouts limit is assumed if
244      *                             this value is 0 or negative.
245      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
246      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
247      */
248     public HashedWheelTimer(
249             ThreadFactory threadFactory,
250             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
251             long maxPendingTimeouts) {
252         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
253                 maxPendingTimeouts, ImmediateExecutor.INSTANCE);
254     }
255     /**
256      * Creates a new timer.
257      *
258      * @param threadFactory        a {@link ThreadFactory} that creates a
259      *                             background {@link Thread} which is dedicated to
260      *                             {@link TimerTask} execution.
261      * @param tickDuration         the duration between tick
262      * @param unit                 the time unit of the {@code tickDuration}
263      * @param ticksPerWheel        the size of the wheel
264      * @param leakDetection        {@code true} if leak detection should be enabled always,
265      *                             if false it will only be enabled if the worker thread is not
266      *                             a daemon thread.
267      * @param maxPendingTimeouts   The maximum number of pending timeouts after which call to
268      *                             {@code newTimeout} will result in
269      *                             {@link java.util.concurrent.RejectedExecutionException}
270      *                             being thrown. No maximum pending timeouts limit is assumed if
271      *                             this value is 0 or negative.
272      * @param taskExecutor         The {@link Executor} that is used to execute the submitted {@link TimerTask}s.
273      *                             The caller is responsible to shutdown the {@link Executor} once it is not needed
274      *                             anymore.
275      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
276      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
277      */
278     public HashedWheelTimer(
279             ThreadFactory threadFactory,
280             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
281             long maxPendingTimeouts, Executor taskExecutor) {
282 
283         checkNotNull(threadFactory, "threadFactory");
284         checkNotNull(unit, "unit");
285         checkPositive(tickDuration, "tickDuration");
286         checkPositive(ticksPerWheel, "ticksPerWheel");
287         this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
288 
289         // Normalize ticksPerWheel to power of two and initialize the wheel.
290         wheel = createWheel(ticksPerWheel);
291         mask = wheel.length - 1;
292 
293         // Convert tickDuration to nanos.
294         long duration = unit.toNanos(tickDuration);
295 
296         // Prevent overflow.
297         if (duration >= Long.MAX_VALUE / wheel.length) {
298             throw new IllegalArgumentException(String.format(
299                     "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
300                     tickDuration, Long.MAX_VALUE / wheel.length));
301         }
302 
303         if (duration < MILLISECOND_NANOS) {
304             logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
305                         tickDuration, MILLISECOND_NANOS);
306             this.tickDuration = MILLISECOND_NANOS;
307         } else {
308             this.tickDuration = duration;
309         }
310 
311         workerThread = threadFactory.newThread(worker);
312 
313         leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
314 
315         this.maxPendingTimeouts = maxPendingTimeouts;
316 
317         if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
318             WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
319             reportTooManyInstances();
320         }
321     }
322 
323     @Override
324     protected void finalize() throws Throwable {
325         try {
326             super.finalize();
327         } finally {
328             // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
329             // we have not yet shutdown then we want to make sure we decrement the active instance count.
330             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
331                 INSTANCE_COUNTER.decrementAndGet();
332             }
333         }
334     }
335 
336     private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
337         ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);
338 
339         HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
340         for (int i = 0; i < wheel.length; i ++) {
341             wheel[i] = new HashedWheelBucket();
342         }
343         return wheel;
344     }
345 
346     /**
347      * Starts the background thread explicitly.  The background thread will
348      * start automatically on demand even if you did not call this method.
349      *
350      * @throws IllegalStateException if this timer has been
351      *                               {@linkplain #stop() stopped} already
352      */
353     public void start() {
354         switch (WORKER_STATE_UPDATER.get(this)) {
355             case WORKER_STATE_INIT:
356                 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
357                     workerThread.start();
358                 }
359                 break;
360             case WORKER_STATE_STARTED:
361                 break;
362             case WORKER_STATE_SHUTDOWN:
363                 throw new IllegalStateException("cannot be started once stopped");
364             default:
365                 throw new Error("Invalid WorkerState");
366         }
367 
368         // Wait until the startTime is initialized by the worker.
369         while (startTime == 0) {
370             try {
371                 startTimeInitialized.await();
372             } catch (InterruptedException ignore) {
373                 // Ignore - it will be ready very soon.
374             }
375         }
376     }
377 
378     @Override
379     public Set<Timeout> stop() {
380         if (Thread.currentThread() == workerThread) {
381             throw new IllegalStateException(
382                     HashedWheelTimer.class.getSimpleName() +
383                             ".stop() cannot be called from " +
384                             TimerTask.class.getSimpleName());
385         }
386 
387         if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
388             // workerState can be 0 or 2 at this moment - let it always be 2.
389             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
390                 INSTANCE_COUNTER.decrementAndGet();
391                 if (leak != null) {
392                     boolean closed = leak.close(this);
393                     assert closed;
394                 }
395             }
396 
397             return Collections.emptySet();
398         }
399 
400         try {
401             boolean interrupted = false;
402             while (workerThread.isAlive()) {
403                 workerThread.interrupt();
404                 try {
405                     workerThread.join(100);
406                 } catch (InterruptedException ignored) {
407                     interrupted = true;
408                 }
409             }
410 
411             if (interrupted) {
412                 Thread.currentThread().interrupt();
413             }
414         } finally {
415             INSTANCE_COUNTER.decrementAndGet();
416             if (leak != null) {
417                 boolean closed = leak.close(this);
418                 assert closed;
419             }
420         }
421         return worker.unprocessedTimeouts();
422     }
423 
424     @Override
425     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
426         checkNotNull(task, "task");
427         checkNotNull(unit, "unit");
428 
429         long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
430 
431         if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
432             pendingTimeouts.decrementAndGet();
433             throw new RejectedExecutionException("Number of pending timeouts ("
434                 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
435                 + "timeouts (" + maxPendingTimeouts + ")");
436         }
437 
438         start();
439 
440         // Add the timeout to the timeout queue which will be processed on the next tick.
441         // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
442         long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
443 
444         // Guard against overflow.
445         if (delay > 0 && deadline < 0) {
446             deadline = Long.MAX_VALUE;
447         }
448         HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
449         timeouts.add(timeout);
450         return timeout;
451     }
452 
453     /**
454      * Returns the number of pending timeouts of this {@link Timer}.
455      */
456     public long pendingTimeouts() {
457         return pendingTimeouts.get();
458     }
459 
460     private static void reportTooManyInstances() {
461         if (logger.isErrorEnabled()) {
462             String resourceType = simpleClassName(HashedWheelTimer.class);
463             logger.error("You are creating too many " + resourceType + " instances. " +
464                     resourceType + " is a shared resource that must be reused across the JVM, " +
465                     "so that only a few instances are created.");
466         }
467     }
468 
469     private final class Worker implements Runnable {
470         private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
471 
472         private long tick;
473 
474         @Override
475         public void run() {
476             // Initialize the startTime.
477             startTime = System.nanoTime();
478             if (startTime == 0) {
479                 // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
480                 startTime = 1;
481             }
482 
483             // Notify the other threads waiting for the initialization at start().
484             startTimeInitialized.countDown();
485 
486             do {
487                 final long deadline = waitForNextTick();
488                 if (deadline > 0) {
489                     int idx = (int) (tick & mask);
490                     processCancelledTasks();
491                     HashedWheelBucket bucket =
492                             wheel[idx];
493                     transferTimeoutsToBuckets();
494                     bucket.expireTimeouts(deadline);
495                     tick++;
496                 }
497             } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
498 
499             // Fill the unprocessedTimeouts so we can return them from stop() method.
500             for (HashedWheelBucket bucket: wheel) {
501                 bucket.clearTimeouts(unprocessedTimeouts);
502             }
503             for (;;) {
504                 HashedWheelTimeout timeout = timeouts.poll();
505                 if (timeout == null) {
506                     break;
507                 }
508                 if (!timeout.isCancelled()) {
509                     unprocessedTimeouts.add(timeout);
510                 }
511             }
512             processCancelledTasks();
513         }
514 
515         private void transferTimeoutsToBuckets() {
516             // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
517             // adds new timeouts in a loop.
518             for (int i = 0; i < 100000; i++) {
519                 HashedWheelTimeout timeout = timeouts.poll();
520                 if (timeout == null) {
521                     // all processed
522                     break;
523                 }
524                 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
525                     // Was cancelled in the meantime.
526                     continue;
527                 }
528 
529                 long calculated = timeout.deadline / tickDuration;
530                 timeout.remainingRounds = (calculated - tick) / wheel.length;
531 
532                 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
533                 int stopIndex = (int) (ticks & mask);
534 
535                 HashedWheelBucket bucket = wheel[stopIndex];
536                 bucket.addTimeout(timeout);
537             }
538         }
539 
540         private void processCancelledTasks() {
541             for (;;) {
542                 HashedWheelTimeout timeout = cancelledTimeouts.poll();
543                 if (timeout == null) {
544                     // all processed
545                     break;
546                 }
547                 try {
548                     timeout.remove();
549                 } catch (Throwable t) {
550                     if (logger.isWarnEnabled()) {
551                         logger.warn("An exception was thrown while process a cancellation task", t);
552                     }
553                 }
554             }
555         }
556 
557         /**
558          * calculate goal nanoTime from startTime and current tick number,
559          * then wait until that goal has been reached.
560          * @return Long.MIN_VALUE if received a shutdown request,
561          * current time otherwise (with Long.MIN_VALUE changed by +1)
562          */
563         private long waitForNextTick() {
564             long deadline = tickDuration * (tick + 1);
565 
566             for (;;) {
567                 final long currentTime = System.nanoTime() - startTime;
568                 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
569 
570                 if (sleepTimeMs <= 0) {
571                     if (currentTime == Long.MIN_VALUE) {
572                         return -Long.MAX_VALUE;
573                     } else {
574                         return currentTime;
575                     }
576                 }
577 
578                 // Check if we run on windows, as if thats the case we will need
579                 // to round the sleepTime as workaround for a bug that only affect
580                 // the JVM if it runs on windows.
581                 //
582                 // See https://github.com/netty/netty/issues/356
583                 if (PlatformDependent.isWindows()) {
584                     sleepTimeMs = sleepTimeMs / 10 * 10;
585                     if (sleepTimeMs == 0) {
586                         sleepTimeMs = 1;
587                     }
588                 }
589 
590                 try {
591                     Thread.sleep(sleepTimeMs);
592                 } catch (InterruptedException ignored) {
593                     if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
594                         return Long.MIN_VALUE;
595                     }
596                 }
597             }
598         }
599 
600         public Set<Timeout> unprocessedTimeouts() {
601             return Collections.unmodifiableSet(unprocessedTimeouts);
602         }
603     }
604 
605     private static final class HashedWheelTimeout implements Timeout, Runnable {
606 
607         private static final int ST_INIT = 0;
608         private static final int ST_CANCELLED = 1;
609         private static final int ST_EXPIRED = 2;
610         private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
611                 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
612 
613         private final HashedWheelTimer timer;
614         private final TimerTask task;
615         private final long deadline;
616 
617         @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
618         private volatile int state = ST_INIT;
619 
620         // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
621         // HashedWheelTimeout will be added to the correct HashedWheelBucket.
622         long remainingRounds;
623 
624         // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
625         // As only the workerThread will act on it there is no need for synchronization / volatile.
626         HashedWheelTimeout next;
627         HashedWheelTimeout prev;
628 
629         // The bucket to which the timeout was added
630         HashedWheelBucket bucket;
631 
632         HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
633             this.timer = timer;
634             this.task = task;
635             this.deadline = deadline;
636         }
637 
638         @Override
639         public Timer timer() {
640             return timer;
641         }
642 
643         @Override
644         public TimerTask task() {
645             return task;
646         }
647 
648         @Override
649         public boolean cancel() {
650             // only update the state it will be removed from HashedWheelBucket on next tick.
651             if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
652                 return false;
653             }
654             // If a task should be canceled we put this to another queue which will be processed on each tick.
655             // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
656             // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
657             timer.cancelledTimeouts.add(this);
658             return true;
659         }
660 
661         void remove() {
662             HashedWheelBucket bucket = this.bucket;
663             if (bucket != null) {
664                 bucket.remove(this);
665             } else {
666                 timer.pendingTimeouts.decrementAndGet();
667             }
668         }
669 
670         public boolean compareAndSetState(int expected, int state) {
671             return STATE_UPDATER.compareAndSet(this, expected, state);
672         }
673 
674         public int state() {
675             return state;
676         }
677 
678         @Override
679         public boolean isCancelled() {
680             return state() == ST_CANCELLED;
681         }
682 
683         @Override
684         public boolean isExpired() {
685             return state() == ST_EXPIRED;
686         }
687 
688         public void expire() {
689             if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
690                 return;
691             }
692 
693             try {
694                 timer.taskExecutor.execute(this);
695             } catch (Throwable t) {
696                 if (logger.isWarnEnabled()) {
697                     logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
698                             + " for execution.", t);
699                 }
700             }
701         }
702 
703         @Override
704         public void run() {
705             try {
706                 task.run(this);
707             } catch (Throwable t) {
708                 if (logger.isWarnEnabled()) {
709                     logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
710                 }
711             }
712         }
713 
714         @Override
715         public String toString() {
716             final long currentTime = System.nanoTime();
717             long remaining = deadline - currentTime + timer.startTime;
718 
719             StringBuilder buf = new StringBuilder(192)
720                .append(simpleClassName(this))
721                .append('(')
722                .append("deadline: ");
723             if (remaining > 0) {
724                 buf.append(remaining)
725                    .append(" ns later");
726             } else if (remaining < 0) {
727                 buf.append(-remaining)
728                    .append(" ns ago");
729             } else {
730                 buf.append("now");
731             }
732 
733             if (isCancelled()) {
734                 buf.append(", cancelled");
735             }
736 
737             return buf.append(", task: ")
738                       .append(task())
739                       .append(')')
740                       .toString();
741         }
742     }
743 
744     /**
745      * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
746      * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
747      * extra object creation is needed.
748      */
749     private static final class HashedWheelBucket {
750         // Used for the linked-list datastructure
751         private HashedWheelTimeout head;
752         private HashedWheelTimeout tail;
753 
754         /**
755          * Add {@link HashedWheelTimeout} to this bucket.
756          */
757         public void addTimeout(HashedWheelTimeout timeout) {
758             assert timeout.bucket == null;
759             timeout.bucket = this;
760             if (head == null) {
761                 head = tail = timeout;
762             } else {
763                 tail.next = timeout;
764                 timeout.prev = tail;
765                 tail = timeout;
766             }
767         }
768 
769         /**
770          * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
771          */
772         public void expireTimeouts(long deadline) {
773             HashedWheelTimeout timeout = head;
774 
775             // process all timeouts
776             while (timeout != null) {
777                 HashedWheelTimeout next = timeout.next;
778                 if (timeout.remainingRounds <= 0) {
779                     next = remove(timeout);
780                     if (timeout.deadline <= deadline) {
781                         timeout.expire();
782                     } else {
783                         // The timeout was placed into a wrong slot. This should never happen.
784                         throw new IllegalStateException(String.format(
785                                 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
786                     }
787                 } else if (timeout.isCancelled()) {
788                     next = remove(timeout);
789                 } else {
790                     timeout.remainingRounds --;
791                 }
792                 timeout = next;
793             }
794         }
795 
796         public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
797             HashedWheelTimeout next = timeout.next;
798             // remove timeout that was either processed or cancelled by updating the linked-list
799             if (timeout.prev != null) {
800                 timeout.prev.next = next;
801             }
802             if (timeout.next != null) {
803                 timeout.next.prev = timeout.prev;
804             }
805 
806             if (timeout == head) {
807                 // if timeout is also the tail we need to adjust the entry too
808                 if (timeout == tail) {
809                     tail = null;
810                     head = null;
811                 } else {
812                     head = next;
813                 }
814             } else if (timeout == tail) {
815                 // if the timeout is the tail modify the tail to be the prev node.
816                 tail = timeout.prev;
817             }
818             // null out prev, next and bucket to allow for GC.
819             timeout.prev = null;
820             timeout.next = null;
821             timeout.bucket = null;
822             timeout.timer.pendingTimeouts.decrementAndGet();
823             return next;
824         }
825 
826         /**
827          * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
828          */
829         public void clearTimeouts(Set<Timeout> set) {
830             for (;;) {
831                 HashedWheelTimeout timeout = pollTimeout();
832                 if (timeout == null) {
833                     return;
834                 }
835                 if (timeout.isExpired() || timeout.isCancelled()) {
836                     continue;
837                 }
838                 set.add(timeout);
839             }
840         }
841 
842         private HashedWheelTimeout pollTimeout() {
843             HashedWheelTimeout head = this.head;
844             if (head == null) {
845                 return null;
846             }
847             HashedWheelTimeout next = head.next;
848             if (next == null) {
849                 tail = this.head =  null;
850             } else {
851                 this.head = next;
852                 next.prev = null;
853             }
854 
855             // null out prev and next to allow for GC.
856             head.next = null;
857             head.prev = null;
858             head.bucket = null;
859             return head;
860         }
861     }
862 }