View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util;
17  
18  import static io.netty.util.internal.ObjectUtil.checkPositive;
19  import static io.netty.util.internal.ObjectUtil.checkNotNull;
20  
21  import io.netty.util.concurrent.ImmediateExecutor;
22  import io.netty.util.internal.MathUtil;
23  import io.netty.util.internal.PlatformDependent;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.CountDownLatch;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.Executors;
34  import java.util.concurrent.RejectedExecutionException;
35  import java.util.concurrent.ThreadFactory;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import static io.netty.util.internal.StringUtil.simpleClassName;
43  
44  /**
45   * A {@link Timer} optimized for approximated I/O timeout scheduling.
46   *
47   * <h3>Tick Duration</h3>
48   *
49   * As described with 'approximated', this timer does not execute the scheduled
50   * {@link TimerTask} on time.  {@link HashedWheelTimer}, on every tick, will
51   * check if there are any {@link TimerTask}s behind the schedule and execute
52   * them.
53   * <p>
54   * You can increase or decrease the accuracy of the execution timing by
55   * specifying smaller or larger tick duration in the constructor.  In most
56   * network applications, I/O timeout does not need to be accurate.  Therefore,
57   * the default tick duration is 100 milliseconds and you will not need to try
58   * different configurations in most cases.
59   *
60   * <h3>Ticks per Wheel (Wheel Size)</h3>
61   *
62   * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
63   * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
64   * function is 'dead line of the task'.  The default number of ticks per wheel
65   * (i.e. the size of the wheel) is 512.  You could specify a larger value
66   * if you are going to schedule a lot of timeouts.
67   *
68   * <h3>Do not create many instances.</h3>
69   *
70   * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
71   * started.  Therefore, you should make sure to create only one instance and
72   * share it across your application.  One of the common mistakes, that makes
73   * your application unresponsive, is to create a new instance for every connection.
74   *
75   * <h3>Implementation Details</h3>
76   *
77   * {@link HashedWheelTimer} is based on
78   * <a href="https://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
79   * Tony Lauck's paper,
80   * <a href="https://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
81   * and Hierarchical Timing Wheels: data structures to efficiently implement a
82   * timer facility'</a>.  More comprehensive slides are located
83   * <a href="https://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
84   */
85  public class HashedWheelTimer implements Timer {
86  
87      static final InternalLogger logger =
88              InternalLoggerFactory.getInstance(HashedWheelTimer.class);
89  
90      private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
91      private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
92      private static final int INSTANCE_COUNT_LIMIT = 64;
93      private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
94      private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
95              .newResourceLeakDetector(HashedWheelTimer.class, 1);
96  
97      private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
98              AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
99  
100     private final ResourceLeakTracker<HashedWheelTimer> leak;
101     private final Worker worker = new Worker();
102     private final Thread workerThread;
103 
104     public static final int WORKER_STATE_INIT = 0;
105     public static final int WORKER_STATE_STARTED = 1;
106     public static final int WORKER_STATE_SHUTDOWN = 2;
107     @SuppressWarnings({"unused", "FieldMayBeFinal"})
108     private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
109 
110     private final long tickDuration;
111     private final HashedWheelBucket[] wheel;
112     private final int mask;
113     private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
114     private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
115     private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
116     private final AtomicLong pendingTimeouts = new AtomicLong(0);
117     private final long maxPendingTimeouts;
118     private final Executor taskExecutor;
119 
120     private volatile long startTime;
121 
122     /**
123      * Creates a new timer with the default thread factory
124      * ({@link Executors#defaultThreadFactory()}), default tick duration, and
125      * default number of ticks per wheel.
126      */
127     public HashedWheelTimer() {
128         this(Executors.defaultThreadFactory());
129     }
130 
131     /**
132      * Creates a new timer with the default thread factory
133      * ({@link Executors#defaultThreadFactory()}) and default number of ticks
134      * per wheel.
135      *
136      * @param tickDuration the duration between tick
137      * @param unit         the time unit of the {@code tickDuration}
138      * @throws NullPointerException     if {@code unit} is {@code null}
139      * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
140      */
141     public HashedWheelTimer(long tickDuration, TimeUnit unit) {
142         this(Executors.defaultThreadFactory(), tickDuration, unit);
143     }
144 
145     /**
146      * Creates a new timer with the default thread factory
147      * ({@link Executors#defaultThreadFactory()}).
148      *
149      * @param tickDuration  the duration between tick
150      * @param unit          the time unit of the {@code tickDuration}
151      * @param ticksPerWheel the size of the wheel
152      * @throws NullPointerException     if {@code unit} is {@code null}
153      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
154      */
155     public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
156         this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
157     }
158 
159     /**
160      * Creates a new timer with the default tick duration and default number of
161      * ticks per wheel.
162      *
163      * @param threadFactory a {@link ThreadFactory} that creates a
164      *                      background {@link Thread} which is dedicated to
165      *                      {@link TimerTask} execution.
166      * @throws NullPointerException if {@code threadFactory} is {@code null}
167      */
168     public HashedWheelTimer(ThreadFactory threadFactory) {
169         this(threadFactory, 100, TimeUnit.MILLISECONDS);
170     }
171 
172     /**
173      * Creates a new timer with the default number of ticks per wheel.
174      *
175      * @param threadFactory a {@link ThreadFactory} that creates a
176      *                      background {@link Thread} which is dedicated to
177      *                      {@link TimerTask} execution.
178      * @param tickDuration  the duration between tick
179      * @param unit          the time unit of the {@code tickDuration}
180      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
181      * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
182      */
183     public HashedWheelTimer(
184             ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
185         this(threadFactory, tickDuration, unit, 512);
186     }
187 
188     /**
189      * Creates a new timer.
190      *
191      * @param threadFactory a {@link ThreadFactory} that creates a
192      *                      background {@link Thread} which is dedicated to
193      *                      {@link TimerTask} execution.
194      * @param tickDuration  the duration between tick
195      * @param unit          the time unit of the {@code tickDuration}
196      * @param ticksPerWheel the size of the wheel
197      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
198      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
199      */
200     public HashedWheelTimer(
201             ThreadFactory threadFactory,
202             long tickDuration, TimeUnit unit, int ticksPerWheel) {
203         this(threadFactory, tickDuration, unit, ticksPerWheel, true);
204     }
205 
206     /**
207      * Creates a new timer.
208      *
209      * @param threadFactory a {@link ThreadFactory} that creates a
210      *                      background {@link Thread} which is dedicated to
211      *                      {@link TimerTask} execution.
212      * @param tickDuration  the duration between tick
213      * @param unit          the time unit of the {@code tickDuration}
214      * @param ticksPerWheel the size of the wheel
215      * @param leakDetection {@code true} if leak detection should be enabled always,
216      *                      if false it will only be enabled if the worker thread is not
217      *                      a daemon thread.
218      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
219      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
220      */
221     public HashedWheelTimer(
222             ThreadFactory threadFactory,
223             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
224         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
225     }
226 
227     /**
228      * Creates a new timer.
229      *
230      * @param threadFactory        a {@link ThreadFactory} that creates a
231      *                             background {@link Thread} which is dedicated to
232      *                             {@link TimerTask} execution.
233      * @param tickDuration         the duration between tick
234      * @param unit                 the time unit of the {@code tickDuration}
235      * @param ticksPerWheel        the size of the wheel
236      * @param leakDetection        {@code true} if leak detection should be enabled always,
237      *                             if false it will only be enabled if the worker thread is not
238      *                             a daemon thread.
239      * @param  maxPendingTimeouts  The maximum number of pending timeouts after which call to
240      *                             {@code newTimeout} will result in
241      *                             {@link java.util.concurrent.RejectedExecutionException}
242      *                             being thrown. No maximum pending timeouts limit is assumed if
243      *                             this value is 0 or negative.
244      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
245      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
246      */
247     public HashedWheelTimer(
248             ThreadFactory threadFactory,
249             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
250             long maxPendingTimeouts) {
251         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
252                 maxPendingTimeouts, ImmediateExecutor.INSTANCE);
253     }
254     /**
255      * Creates a new timer.
256      *
257      * @param threadFactory        a {@link ThreadFactory} that creates a
258      *                             background {@link Thread} which is dedicated to
259      *                             {@link TimerTask} execution.
260      * @param tickDuration         the duration between tick
261      * @param unit                 the time unit of the {@code tickDuration}
262      * @param ticksPerWheel        the size of the wheel
263      * @param leakDetection        {@code true} if leak detection should be enabled always,
264      *                             if false it will only be enabled if the worker thread is not
265      *                             a daemon thread.
266      * @param maxPendingTimeouts   The maximum number of pending timeouts after which call to
267      *                             {@code newTimeout} will result in
268      *                             {@link java.util.concurrent.RejectedExecutionException}
269      *                             being thrown. No maximum pending timeouts limit is assumed if
270      *                             this value is 0 or negative.
271      * @param taskExecutor         The {@link Executor} that is used to execute the submitted {@link TimerTask}s.
272      *                             The caller is responsible to shutdown the {@link Executor} once it is not needed
273      *                             anymore.
274      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
275      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
276      */
277     public HashedWheelTimer(
278             ThreadFactory threadFactory,
279             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
280             long maxPendingTimeouts, Executor taskExecutor) {
281 
282         checkNotNull(threadFactory, "threadFactory");
283         checkNotNull(unit, "unit");
284         checkPositive(tickDuration, "tickDuration");
285         checkPositive(ticksPerWheel, "ticksPerWheel");
286         this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
287 
288         // Normalize ticksPerWheel to power of two and initialize the wheel.
289         wheel = createWheel(ticksPerWheel);
290         mask = wheel.length - 1;
291 
292         // Convert tickDuration to nanos.
293         long duration = unit.toNanos(tickDuration);
294 
295         // Prevent overflow.
296         if (duration >= Long.MAX_VALUE / wheel.length) {
297             throw new IllegalArgumentException(String.format(
298                     "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
299                     tickDuration, Long.MAX_VALUE / wheel.length));
300         }
301 
302         if (duration < MILLISECOND_NANOS) {
303             logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
304                         tickDuration, MILLISECOND_NANOS);
305             this.tickDuration = MILLISECOND_NANOS;
306         } else {
307             this.tickDuration = duration;
308         }
309 
310         workerThread = threadFactory.newThread(worker);
311 
312         leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
313 
314         this.maxPendingTimeouts = maxPendingTimeouts;
315 
316         if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
317             WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
318             reportTooManyInstances();
319         }
320     }
321 
322     @Override
323     protected void finalize() throws Throwable {
324         try {
325             super.finalize();
326         } finally {
327             // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
328             // we have not yet shutdown then we want to make sure we decrement the active instance count.
329             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
330                 INSTANCE_COUNTER.decrementAndGet();
331             }
332         }
333     }
334 
335     private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
336         ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);
337 
338         HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
339         for (int i = 0; i < wheel.length; i ++) {
340             wheel[i] = new HashedWheelBucket();
341         }
342         return wheel;
343     }
344 
345     /**
346      * Starts the background thread explicitly.  The background thread will
347      * start automatically on demand even if you did not call this method.
348      *
349      * @throws IllegalStateException if this timer has been
350      *                               {@linkplain #stop() stopped} already
351      */
352     public void start() {
353         int state = WORKER_STATE_UPDATER.get(this);
354         switch (state) {
355             case WORKER_STATE_INIT:
356                 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
357                     workerThread.start();
358                 }
359                 break;
360             case WORKER_STATE_STARTED:
361                 break;
362             case WORKER_STATE_SHUTDOWN:
363                 throw new IllegalStateException("cannot be started once stopped");
364             default:
365                 throw new Error("Invalid WorkerState: " + state);
366         }
367 
368         // Wait until the startTime is initialized by the worker.
369         while (startTime == 0) {
370             try {
371                 startTimeInitialized.await();
372             } catch (InterruptedException ignore) {
373                 // Ignore - it will be ready very soon.
374             }
375         }
376     }
377 
378     @Override
379     public Set<Timeout> stop() {
380         if (Thread.currentThread() == workerThread) {
381             throw new IllegalStateException(
382                     HashedWheelTimer.class.getSimpleName() +
383                             ".stop() cannot be called from " +
384                             TimerTask.class.getSimpleName());
385         }
386 
387         if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
388             // workerState can be 0 or 2 at this moment - let it always be 2.
389             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
390                 INSTANCE_COUNTER.decrementAndGet();
391                 if (leak != null) {
392                     boolean closed = leak.close(this);
393                     assert closed;
394                 }
395             }
396 
397             return Collections.emptySet();
398         }
399 
400         try {
401             boolean interrupted = false;
402             while (workerThread.isAlive()) {
403                 workerThread.interrupt();
404                 try {
405                     workerThread.join(100);
406                 } catch (InterruptedException ignored) {
407                     interrupted = true;
408                 }
409             }
410 
411             if (interrupted) {
412                 Thread.currentThread().interrupt();
413             }
414         } finally {
415             INSTANCE_COUNTER.decrementAndGet();
416             if (leak != null) {
417                 boolean closed = leak.close(this);
418                 assert closed;
419             }
420         }
421         Set<Timeout> unprocessed = worker.unprocessedTimeouts();
422         Set<Timeout> cancelled = new HashSet<Timeout>(unprocessed.size());
423         for (Timeout timeout : unprocessed) {
424             if (timeout.cancel()) {
425                 cancelled.add(timeout);
426             }
427         }
428         return cancelled;
429     }
430 
431     @Override
432     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
433         checkNotNull(task, "task");
434         checkNotNull(unit, "unit");
435 
436         long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
437 
438         if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
439             pendingTimeouts.decrementAndGet();
440             throw new RejectedExecutionException("Number of pending timeouts ("
441                 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
442                 + "timeouts (" + maxPendingTimeouts + ")");
443         }
444 
445         start();
446 
447         // Add the timeout to the timeout queue which will be processed on the next tick.
448         // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
449         long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
450 
451         // Guard against overflow.
452         if (delay > 0 && deadline < 0) {
453             deadline = Long.MAX_VALUE;
454         }
455         HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
456         timeouts.add(timeout);
457         return timeout;
458     }
459 
460     /**
461      * Returns the number of pending timeouts of this {@link Timer}.
462      */
463     public long pendingTimeouts() {
464         return pendingTimeouts.get();
465     }
466 
467     private static void reportTooManyInstances() {
468         if (logger.isErrorEnabled()) {
469             String resourceType = simpleClassName(HashedWheelTimer.class);
470             logger.error("You are creating too many " + resourceType + " instances. " +
471                     resourceType + " is a shared resource that must be reused across the JVM, " +
472                     "so that only a few instances are created.");
473         }
474     }
475 
476     private final class Worker implements Runnable {
477         private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
478 
479         private long tick;
480 
481         @Override
482         public void run() {
483             // Initialize the startTime.
484             startTime = System.nanoTime();
485             if (startTime == 0) {
486                 // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
487                 startTime = 1;
488             }
489 
490             // Notify the other threads waiting for the initialization at start().
491             startTimeInitialized.countDown();
492 
493             do {
494                 final long deadline = waitForNextTick();
495                 if (deadline > 0) {
496                     int idx = (int) (tick & mask);
497                     processCancelledTasks();
498                     HashedWheelBucket bucket =
499                             wheel[idx];
500                     transferTimeoutsToBuckets();
501                     bucket.expireTimeouts(deadline);
502                     tick++;
503                 }
504             } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
505 
506             // Fill the unprocessedTimeouts so we can return them from stop() method.
507             for (HashedWheelBucket bucket: wheel) {
508                 bucket.clearTimeouts(unprocessedTimeouts);
509             }
510             for (;;) {
511                 HashedWheelTimeout timeout = timeouts.poll();
512                 if (timeout == null) {
513                     break;
514                 }
515                 if (!timeout.isCancelled()) {
516                     unprocessedTimeouts.add(timeout);
517                 }
518             }
519             processCancelledTasks();
520         }
521 
522         private void transferTimeoutsToBuckets() {
523             // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
524             // adds new timeouts in a loop.
525             for (int i = 0; i < 100000; i++) {
526                 HashedWheelTimeout timeout = timeouts.poll();
527                 if (timeout == null) {
528                     // all processed
529                     break;
530                 }
531                 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
532                     // Was cancelled in the meantime.
533                     continue;
534                 }
535 
536                 long calculated = timeout.deadline / tickDuration;
537                 timeout.remainingRounds = (calculated - tick) / wheel.length;
538 
539                 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
540                 int stopIndex = (int) (ticks & mask);
541 
542                 HashedWheelBucket bucket = wheel[stopIndex];
543                 bucket.addTimeout(timeout);
544             }
545         }
546 
547         private void processCancelledTasks() {
548             for (;;) {
549                 HashedWheelTimeout timeout = cancelledTimeouts.poll();
550                 if (timeout == null) {
551                     // all processed
552                     break;
553                 }
554                 try {
555                     timeout.removeAfterCancellation();
556                 } catch (Throwable t) {
557                     if (logger.isWarnEnabled()) {
558                         logger.warn("An exception was thrown while process a cancellation task", t);
559                     }
560                 }
561             }
562         }
563 
564         /**
565          * calculate goal nanoTime from startTime and current tick number,
566          * then wait until that goal has been reached.
567          * @return Long.MIN_VALUE if received a shutdown request,
568          * current time otherwise (with Long.MIN_VALUE changed by +1)
569          */
570         private long waitForNextTick() {
571             long deadline = tickDuration * (tick + 1);
572 
573             for (;;) {
574                 final long currentTime = System.nanoTime() - startTime;
575                 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
576 
577                 if (sleepTimeMs <= 0) {
578                     if (currentTime == Long.MIN_VALUE) {
579                         return -Long.MAX_VALUE;
580                     } else {
581                         return currentTime;
582                     }
583                 }
584 
585                 // Check if we run on windows, as if thats the case we will need
586                 // to round the sleepTime as workaround for a bug that only affect
587                 // the JVM if it runs on windows.
588                 //
589                 // See https://github.com/netty/netty/issues/356
590                 if (PlatformDependent.isWindows()) {
591                     sleepTimeMs = sleepTimeMs / 10 * 10;
592                     if (sleepTimeMs == 0) {
593                         sleepTimeMs = 1;
594                     }
595                 }
596 
597                 try {
598                     Thread.sleep(sleepTimeMs);
599                 } catch (InterruptedException ignored) {
600                     if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
601                         return Long.MIN_VALUE;
602                     }
603                 }
604             }
605         }
606 
607         public Set<Timeout> unprocessedTimeouts() {
608             return Collections.unmodifiableSet(unprocessedTimeouts);
609         }
610     }
611 
612     private static final class HashedWheelTimeout implements Timeout, Runnable {
613 
614         private static final int ST_INIT = 0;
615         private static final int ST_CANCELLED = 1;
616         private static final int ST_EXPIRED = 2;
617         private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
618                 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
619 
620         private final HashedWheelTimer timer;
621         private final TimerTask task;
622         private final long deadline;
623 
624         @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
625         private volatile int state = ST_INIT;
626 
627         // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
628         // HashedWheelTimeout will be added to the correct HashedWheelBucket.
629         long remainingRounds;
630 
631         // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
632         // As only the workerThread will act on it there is no need for synchronization / volatile.
633         HashedWheelTimeout next;
634         HashedWheelTimeout prev;
635 
636         // The bucket to which the timeout was added
637         HashedWheelBucket bucket;
638 
639         HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
640             this.timer = timer;
641             this.task = task;
642             this.deadline = deadline;
643         }
644 
645         @Override
646         public Timer timer() {
647             return timer;
648         }
649 
650         @Override
651         public TimerTask task() {
652             return task;
653         }
654 
655         @Override
656         public boolean cancel() {
657             // only update the state it will be removed from HashedWheelBucket on next tick.
658             if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
659                 return false;
660             }
661             // If a task should be canceled we put this to another queue which will be processed on each tick.
662             // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
663             // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
664             timer.cancelledTimeouts.add(this);
665             return true;
666         }
667 
668         private void remove() {
669             HashedWheelBucket bucket = this.bucket;
670             if (bucket != null) {
671                 bucket.remove(this);
672             }
673             timer.pendingTimeouts.decrementAndGet();
674         }
675         void removeAfterCancellation() {
676             remove();
677             task.cancelled(this);
678         }
679 
680         public boolean compareAndSetState(int expected, int state) {
681             return STATE_UPDATER.compareAndSet(this, expected, state);
682         }
683 
684         public int state() {
685             return state;
686         }
687 
688         @Override
689         public boolean isCancelled() {
690             return state() == ST_CANCELLED;
691         }
692 
693         @Override
694         public boolean isExpired() {
695             return state() == ST_EXPIRED;
696         }
697 
698         public void expire() {
699             if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
700                 return;
701             }
702 
703             try {
704                 remove();
705                 timer.taskExecutor.execute(this);
706             } catch (Throwable t) {
707                 if (logger.isWarnEnabled()) {
708                     logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
709                             + " for execution.", t);
710                 }
711             }
712         }
713 
714         @Override
715         public void run() {
716             try {
717                 task.run(this);
718             } catch (Throwable t) {
719                 if (logger.isWarnEnabled()) {
720                     logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
721                 }
722             }
723         }
724 
725         @Override
726         public String toString() {
727             final long currentTime = System.nanoTime();
728             long remaining = deadline - currentTime + timer.startTime;
729 
730             StringBuilder buf = new StringBuilder(192)
731                .append(simpleClassName(this))
732                .append('(')
733                .append("deadline: ");
734             if (remaining > 0) {
735                 buf.append(remaining)
736                    .append(" ns later");
737             } else if (remaining < 0) {
738                 buf.append(-remaining)
739                    .append(" ns ago");
740             } else {
741                 buf.append("now");
742             }
743 
744             if (isCancelled()) {
745                 buf.append(", cancelled");
746             }
747 
748             return buf.append(", task: ")
749                       .append(task())
750                       .append(')')
751                       .toString();
752         }
753     }
754 
755     /**
756      * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
757      * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
758      * extra object creation is needed.
759      */
760     private static final class HashedWheelBucket {
761         // Used for the linked-list datastructure
762         private HashedWheelTimeout head;
763         private HashedWheelTimeout tail;
764 
765         /**
766          * Add {@link HashedWheelTimeout} to this bucket.
767          */
768         public void addTimeout(HashedWheelTimeout timeout) {
769             assert timeout.bucket == null;
770             timeout.bucket = this;
771             if (head == null) {
772                 head = tail = timeout;
773             } else {
774                 tail.next = timeout;
775                 timeout.prev = tail;
776                 tail = timeout;
777             }
778         }
779 
780         /**
781          * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
782          */
783         public void expireTimeouts(long deadline) {
784             HashedWheelTimeout timeout = head;
785 
786             // process all timeouts
787             while (timeout != null) {
788                 HashedWheelTimeout next = timeout.next;
789                 if (timeout.remainingRounds <= 0) {
790                     if (timeout.deadline <= deadline) {
791                         timeout.expire();
792                     } else {
793                         // The timeout was placed into a wrong slot. This should never happen.
794                         throw new IllegalStateException(String.format(
795                                 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
796                     }
797                 } else if (!timeout.isCancelled()) {
798                     timeout.remainingRounds --;
799                 }
800                 timeout = next;
801             }
802         }
803 
804         public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
805             HashedWheelTimeout next = timeout.next;
806             // remove timeout that was either processed or cancelled by updating the linked-list
807             if (timeout.prev != null) {
808                 timeout.prev.next = next;
809             }
810             if (timeout.next != null) {
811                 timeout.next.prev = timeout.prev;
812             }
813 
814             if (timeout == head) {
815                 // if timeout is also the tail we need to adjust the entry too
816                 if (timeout == tail) {
817                     tail = null;
818                     head = null;
819                 } else {
820                     head = next;
821                 }
822             } else if (timeout == tail) {
823                 // if the timeout is the tail modify the tail to be the prev node.
824                 tail = timeout.prev;
825             }
826             // null out prev, next and bucket to allow for GC.
827             timeout.prev = null;
828             timeout.next = null;
829             timeout.bucket = null;
830             return next;
831         }
832 
833         /**
834          * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
835          */
836         public void clearTimeouts(Set<Timeout> set) {
837             for (;;) {
838                 HashedWheelTimeout timeout = pollTimeout();
839                 if (timeout == null) {
840                     return;
841                 }
842                 if (timeout.isExpired() || timeout.isCancelled()) {
843                     continue;
844                 }
845                 set.add(timeout);
846             }
847         }
848 
849         private HashedWheelTimeout pollTimeout() {
850             HashedWheelTimeout head = this.head;
851             if (head == null) {
852                 return null;
853             }
854             HashedWheelTimeout next = head.next;
855             if (next == null) {
856                 tail = this.head =  null;
857             } else {
858                 this.head = next;
859                 next.prev = null;
860             }
861 
862             // null out prev and next to allow for GC.
863             head.next = null;
864             head.prev = null;
865             head.bucket = null;
866             return head;
867         }
868     }
869 }