View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util;
17  
18  import static io.netty.util.internal.ObjectUtil.checkInRange;
19  import static io.netty.util.internal.ObjectUtil.checkPositive;
20  import static io.netty.util.internal.ObjectUtil.checkNotNull;
21  
22  import io.netty.util.concurrent.ImmediateExecutor;
23  import io.netty.util.internal.PlatformDependent;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.CountDownLatch;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.Executors;
34  import java.util.concurrent.RejectedExecutionException;
35  import java.util.concurrent.ThreadFactory;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import static io.netty.util.internal.StringUtil.simpleClassName;
43  
44  /**
45   * A {@link Timer} optimized for approximated I/O timeout scheduling.
46   *
47   * <h3>Tick Duration</h3>
48   *
49   * As described with 'approximated', this timer does not execute the scheduled
50   * {@link TimerTask} on time.  {@link HashedWheelTimer}, on every tick, will
51   * check if there are any {@link TimerTask}s behind the schedule and execute
52   * them.
53   * <p>
54   * You can increase or decrease the accuracy of the execution timing by
55   * specifying smaller or larger tick duration in the constructor.  In most
56   * network applications, I/O timeout does not need to be accurate.  Therefore,
57   * the default tick duration is 100 milliseconds and you will not need to try
58   * different configurations in most cases.
59   *
60   * <h3>Ticks per Wheel (Wheel Size)</h3>
61   *
62   * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
63   * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
64   * function is 'dead line of the task'.  The default number of ticks per wheel
65   * (i.e. the size of the wheel) is 512.  You could specify a larger value
66   * if you are going to schedule a lot of timeouts.
67   *
68   * <h3>Do not create many instances.</h3>
69   *
70   * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
71   * started.  Therefore, you should make sure to create only one instance and
72   * share it across your application.  One of the common mistakes, that makes
73   * your application unresponsive, is to create a new instance for every connection.
74   *
75   * <h3>Implementation Details</h3>
76   *
77   * {@link HashedWheelTimer} is based on
78   * <a href="https://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
79   * Tony Lauck's paper,
80   * <a href="https://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
81   * and Hierarchical Timing Wheels: data structures to efficiently implement a
82   * timer facility'</a>.  More comprehensive slides are located
83   * <a href="https://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
84   */
85  public class HashedWheelTimer implements Timer {
86  
87      static final InternalLogger logger =
88              InternalLoggerFactory.getInstance(HashedWheelTimer.class);
89  
90      private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
91      private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
92      private static final int INSTANCE_COUNT_LIMIT = 64;
93      private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
94      private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
95              .newResourceLeakDetector(HashedWheelTimer.class, 1);
96  
97      private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
98              AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
99  
100     private final ResourceLeakTracker<HashedWheelTimer> leak;
101     private final Worker worker = new Worker();
102     private final Thread workerThread;
103 
104     public static final int WORKER_STATE_INIT = 0;
105     public static final int WORKER_STATE_STARTED = 1;
106     public static final int WORKER_STATE_SHUTDOWN = 2;
107     @SuppressWarnings({"unused", "FieldMayBeFinal"})
108     private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
109 
110     private final long tickDuration;
111     private final HashedWheelBucket[] wheel;
112     private final int mask;
113     private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
114     private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
115     private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
116     private final AtomicLong pendingTimeouts = new AtomicLong(0);
117     private final long maxPendingTimeouts;
118     private final Executor taskExecutor;
119 
120     private volatile long startTime;
121 
122     /**
123      * Creates a new timer with the default thread factory
124      * ({@link Executors#defaultThreadFactory()}), default tick duration, and
125      * default number of ticks per wheel.
126      */
127     public HashedWheelTimer() {
128         this(Executors.defaultThreadFactory());
129     }
130 
131     /**
132      * Creates a new timer with the default thread factory
133      * ({@link Executors#defaultThreadFactory()}) and default number of ticks
134      * per wheel.
135      *
136      * @param tickDuration the duration between tick
137      * @param unit         the time unit of the {@code tickDuration}
138      * @throws NullPointerException     if {@code unit} is {@code null}
139      * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
140      */
141     public HashedWheelTimer(long tickDuration, TimeUnit unit) {
142         this(Executors.defaultThreadFactory(), tickDuration, unit);
143     }
144 
145     /**
146      * Creates a new timer with the default thread factory
147      * ({@link Executors#defaultThreadFactory()}).
148      *
149      * @param tickDuration  the duration between tick
150      * @param unit          the time unit of the {@code tickDuration}
151      * @param ticksPerWheel the size of the wheel
152      * @throws NullPointerException     if {@code unit} is {@code null}
153      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
154      */
155     public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
156         this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
157     }
158 
159     /**
160      * Creates a new timer with the default tick duration and default number of
161      * ticks per wheel.
162      *
163      * @param threadFactory a {@link ThreadFactory} that creates a
164      *                      background {@link Thread} which is dedicated to
165      *                      {@link TimerTask} execution.
166      * @throws NullPointerException if {@code threadFactory} is {@code null}
167      */
168     public HashedWheelTimer(ThreadFactory threadFactory) {
169         this(threadFactory, 100, TimeUnit.MILLISECONDS);
170     }
171 
172     /**
173      * Creates a new timer with the default number of ticks per wheel.
174      *
175      * @param threadFactory a {@link ThreadFactory} that creates a
176      *                      background {@link Thread} which is dedicated to
177      *                      {@link TimerTask} execution.
178      * @param tickDuration  the duration between tick
179      * @param unit          the time unit of the {@code tickDuration}
180      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
181      * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
182      */
183     public HashedWheelTimer(
184             ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
185         this(threadFactory, tickDuration, unit, 512);
186     }
187 
188     /**
189      * Creates a new timer.
190      *
191      * @param threadFactory a {@link ThreadFactory} that creates a
192      *                      background {@link Thread} which is dedicated to
193      *                      {@link TimerTask} execution.
194      * @param tickDuration  the duration between tick
195      * @param unit          the time unit of the {@code tickDuration}
196      * @param ticksPerWheel the size of the wheel
197      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
198      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
199      */
200     public HashedWheelTimer(
201             ThreadFactory threadFactory,
202             long tickDuration, TimeUnit unit, int ticksPerWheel) {
203         this(threadFactory, tickDuration, unit, ticksPerWheel, true);
204     }
205 
206     /**
207      * Creates a new timer.
208      *
209      * @param threadFactory a {@link ThreadFactory} that creates a
210      *                      background {@link Thread} which is dedicated to
211      *                      {@link TimerTask} execution.
212      * @param tickDuration  the duration between tick
213      * @param unit          the time unit of the {@code tickDuration}
214      * @param ticksPerWheel the size of the wheel
215      * @param leakDetection {@code true} if leak detection should be enabled always,
216      *                      if false it will only be enabled if the worker thread is not
217      *                      a daemon thread.
218      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
219      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
220      */
221     public HashedWheelTimer(
222             ThreadFactory threadFactory,
223             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
224         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
225     }
226 
227     /**
228      * Creates a new timer.
229      *
230      * @param threadFactory        a {@link ThreadFactory} that creates a
231      *                             background {@link Thread} which is dedicated to
232      *                             {@link TimerTask} execution.
233      * @param tickDuration         the duration between tick
234      * @param unit                 the time unit of the {@code tickDuration}
235      * @param ticksPerWheel        the size of the wheel
236      * @param leakDetection        {@code true} if leak detection should be enabled always,
237      *                             if false it will only be enabled if the worker thread is not
238      *                             a daemon thread.
239      * @param  maxPendingTimeouts  The maximum number of pending timeouts after which call to
240      *                             {@code newTimeout} will result in
241      *                             {@link java.util.concurrent.RejectedExecutionException}
242      *                             being thrown. No maximum pending timeouts limit is assumed if
243      *                             this value is 0 or negative.
244      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
245      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
246      */
247     public HashedWheelTimer(
248             ThreadFactory threadFactory,
249             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
250             long maxPendingTimeouts) {
251         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
252                 maxPendingTimeouts, ImmediateExecutor.INSTANCE);
253     }
254     /**
255      * Creates a new timer.
256      *
257      * @param threadFactory        a {@link ThreadFactory} that creates a
258      *                             background {@link Thread} which is dedicated to
259      *                             {@link TimerTask} execution.
260      * @param tickDuration         the duration between tick
261      * @param unit                 the time unit of the {@code tickDuration}
262      * @param ticksPerWheel        the size of the wheel
263      * @param leakDetection        {@code true} if leak detection should be enabled always,
264      *                             if false it will only be enabled if the worker thread is not
265      *                             a daemon thread.
266      * @param maxPendingTimeouts   The maximum number of pending timeouts after which call to
267      *                             {@code newTimeout} will result in
268      *                             {@link java.util.concurrent.RejectedExecutionException}
269      *                             being thrown. No maximum pending timeouts limit is assumed if
270      *                             this value is 0 or negative.
271      * @param taskExecutor         The {@link Executor} that is used to execute the submitted {@link TimerTask}s.
272      *                             The caller is responsible to shutdown the {@link Executor} once it is not needed
273      *                             anymore.
274      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
275      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
276      */
277     public HashedWheelTimer(
278             ThreadFactory threadFactory,
279             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
280             long maxPendingTimeouts, Executor taskExecutor) {
281 
282         checkNotNull(threadFactory, "threadFactory");
283         checkNotNull(unit, "unit");
284         checkPositive(tickDuration, "tickDuration");
285         checkPositive(ticksPerWheel, "ticksPerWheel");
286         this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
287 
288         // Normalize ticksPerWheel to power of two and initialize the wheel.
289         wheel = createWheel(ticksPerWheel);
290         mask = wheel.length - 1;
291 
292         // Convert tickDuration to nanos.
293         long duration = unit.toNanos(tickDuration);
294 
295         // Prevent overflow.
296         if (duration >= Long.MAX_VALUE / wheel.length) {
297             throw new IllegalArgumentException(String.format(
298                     "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
299                     tickDuration, Long.MAX_VALUE / wheel.length));
300         }
301 
302         if (duration < MILLISECOND_NANOS) {
303             logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
304                         tickDuration, MILLISECOND_NANOS);
305             this.tickDuration = MILLISECOND_NANOS;
306         } else {
307             this.tickDuration = duration;
308         }
309 
310         workerThread = threadFactory.newThread(worker);
311 
312         leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
313 
314         this.maxPendingTimeouts = maxPendingTimeouts;
315 
316         if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
317             WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
318             reportTooManyInstances();
319         }
320     }
321 
322     @Override
323     protected void finalize() throws Throwable {
324         try {
325             super.finalize();
326         } finally {
327             // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
328             // we have not yet shutdown then we want to make sure we decrement the active instance count.
329             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
330                 INSTANCE_COUNTER.decrementAndGet();
331             }
332         }
333     }
334 
335     private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
336         //ticksPerWheel may not be greater than 2^30
337         checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
338 
339         ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
340         HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
341         for (int i = 0; i < wheel.length; i ++) {
342             wheel[i] = new HashedWheelBucket();
343         }
344         return wheel;
345     }
346 
347     private static int normalizeTicksPerWheel(int ticksPerWheel) {
348         int normalizedTicksPerWheel = 1;
349         while (normalizedTicksPerWheel < ticksPerWheel) {
350             normalizedTicksPerWheel <<= 1;
351         }
352         return normalizedTicksPerWheel;
353     }
354 
355     /**
356      * Starts the background thread explicitly.  The background thread will
357      * start automatically on demand even if you did not call this method.
358      *
359      * @throws IllegalStateException if this timer has been
360      *                               {@linkplain #stop() stopped} already
361      */
362     public void start() {
363         switch (WORKER_STATE_UPDATER.get(this)) {
364             case WORKER_STATE_INIT:
365                 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
366                     workerThread.start();
367                 }
368                 break;
369             case WORKER_STATE_STARTED:
370                 break;
371             case WORKER_STATE_SHUTDOWN:
372                 throw new IllegalStateException("cannot be started once stopped");
373             default:
374                 throw new Error("Invalid WorkerState");
375         }
376 
377         // Wait until the startTime is initialized by the worker.
378         while (startTime == 0) {
379             try {
380                 startTimeInitialized.await();
381             } catch (InterruptedException ignore) {
382                 // Ignore - it will be ready very soon.
383             }
384         }
385     }
386 
387     @Override
388     public Set<Timeout> stop() {
389         if (Thread.currentThread() == workerThread) {
390             throw new IllegalStateException(
391                     HashedWheelTimer.class.getSimpleName() +
392                             ".stop() cannot be called from " +
393                             TimerTask.class.getSimpleName());
394         }
395 
396         if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
397             // workerState can be 0 or 2 at this moment - let it always be 2.
398             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
399                 INSTANCE_COUNTER.decrementAndGet();
400                 if (leak != null) {
401                     boolean closed = leak.close(this);
402                     assert closed;
403                 }
404             }
405 
406             return Collections.emptySet();
407         }
408 
409         try {
410             boolean interrupted = false;
411             while (workerThread.isAlive()) {
412                 workerThread.interrupt();
413                 try {
414                     workerThread.join(100);
415                 } catch (InterruptedException ignored) {
416                     interrupted = true;
417                 }
418             }
419 
420             if (interrupted) {
421                 Thread.currentThread().interrupt();
422             }
423         } finally {
424             INSTANCE_COUNTER.decrementAndGet();
425             if (leak != null) {
426                 boolean closed = leak.close(this);
427                 assert closed;
428             }
429         }
430         return worker.unprocessedTimeouts();
431     }
432 
433     @Override
434     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
435         checkNotNull(task, "task");
436         checkNotNull(unit, "unit");
437 
438         long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
439 
440         if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
441             pendingTimeouts.decrementAndGet();
442             throw new RejectedExecutionException("Number of pending timeouts ("
443                 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
444                 + "timeouts (" + maxPendingTimeouts + ")");
445         }
446 
447         start();
448 
449         // Add the timeout to the timeout queue which will be processed on the next tick.
450         // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
451         long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
452 
453         // Guard against overflow.
454         if (delay > 0 && deadline < 0) {
455             deadline = Long.MAX_VALUE;
456         }
457         HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
458         timeouts.add(timeout);
459         return timeout;
460     }
461 
462     /**
463      * Returns the number of pending timeouts of this {@link Timer}.
464      */
465     public long pendingTimeouts() {
466         return pendingTimeouts.get();
467     }
468 
469     private static void reportTooManyInstances() {
470         if (logger.isErrorEnabled()) {
471             String resourceType = simpleClassName(HashedWheelTimer.class);
472             logger.error("You are creating too many " + resourceType + " instances. " +
473                     resourceType + " is a shared resource that must be reused across the JVM, " +
474                     "so that only a few instances are created.");
475         }
476     }
477 
478     private final class Worker implements Runnable {
479         private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
480 
481         private long tick;
482 
483         @Override
484         public void run() {
485             // Initialize the startTime.
486             startTime = System.nanoTime();
487             if (startTime == 0) {
488                 // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
489                 startTime = 1;
490             }
491 
492             // Notify the other threads waiting for the initialization at start().
493             startTimeInitialized.countDown();
494 
495             do {
496                 final long deadline = waitForNextTick();
497                 if (deadline > 0) {
498                     int idx = (int) (tick & mask);
499                     processCancelledTasks();
500                     HashedWheelBucket bucket =
501                             wheel[idx];
502                     transferTimeoutsToBuckets();
503                     bucket.expireTimeouts(deadline);
504                     tick++;
505                 }
506             } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
507 
508             // Fill the unprocessedTimeouts so we can return them from stop() method.
509             for (HashedWheelBucket bucket: wheel) {
510                 bucket.clearTimeouts(unprocessedTimeouts);
511             }
512             for (;;) {
513                 HashedWheelTimeout timeout = timeouts.poll();
514                 if (timeout == null) {
515                     break;
516                 }
517                 if (!timeout.isCancelled()) {
518                     unprocessedTimeouts.add(timeout);
519                 }
520             }
521             processCancelledTasks();
522         }
523 
524         private void transferTimeoutsToBuckets() {
525             // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
526             // adds new timeouts in a loop.
527             for (int i = 0; i < 100000; i++) {
528                 HashedWheelTimeout timeout = timeouts.poll();
529                 if (timeout == null) {
530                     // all processed
531                     break;
532                 }
533                 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
534                     // Was cancelled in the meantime.
535                     continue;
536                 }
537 
538                 long calculated = timeout.deadline / tickDuration;
539                 timeout.remainingRounds = (calculated - tick) / wheel.length;
540 
541                 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
542                 int stopIndex = (int) (ticks & mask);
543 
544                 HashedWheelBucket bucket = wheel[stopIndex];
545                 bucket.addTimeout(timeout);
546             }
547         }
548 
549         private void processCancelledTasks() {
550             for (;;) {
551                 HashedWheelTimeout timeout = cancelledTimeouts.poll();
552                 if (timeout == null) {
553                     // all processed
554                     break;
555                 }
556                 try {
557                     timeout.remove();
558                 } catch (Throwable t) {
559                     if (logger.isWarnEnabled()) {
560                         logger.warn("An exception was thrown while process a cancellation task", t);
561                     }
562                 }
563             }
564         }
565 
566         /**
567          * calculate goal nanoTime from startTime and current tick number,
568          * then wait until that goal has been reached.
569          * @return Long.MIN_VALUE if received a shutdown request,
570          * current time otherwise (with Long.MIN_VALUE changed by +1)
571          */
572         private long waitForNextTick() {
573             long deadline = tickDuration * (tick + 1);
574 
575             for (;;) {
576                 final long currentTime = System.nanoTime() - startTime;
577                 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
578 
579                 if (sleepTimeMs <= 0) {
580                     if (currentTime == Long.MIN_VALUE) {
581                         return -Long.MAX_VALUE;
582                     } else {
583                         return currentTime;
584                     }
585                 }
586 
587                 // Check if we run on windows, as if thats the case we will need
588                 // to round the sleepTime as workaround for a bug that only affect
589                 // the JVM if it runs on windows.
590                 //
591                 // See https://github.com/netty/netty/issues/356
592                 if (PlatformDependent.isWindows()) {
593                     sleepTimeMs = sleepTimeMs / 10 * 10;
594                     if (sleepTimeMs == 0) {
595                         sleepTimeMs = 1;
596                     }
597                 }
598 
599                 try {
600                     Thread.sleep(sleepTimeMs);
601                 } catch (InterruptedException ignored) {
602                     if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
603                         return Long.MIN_VALUE;
604                     }
605                 }
606             }
607         }
608 
609         public Set<Timeout> unprocessedTimeouts() {
610             return Collections.unmodifiableSet(unprocessedTimeouts);
611         }
612     }
613 
614     private static final class HashedWheelTimeout implements Timeout, Runnable {
615 
616         private static final int ST_INIT = 0;
617         private static final int ST_CANCELLED = 1;
618         private static final int ST_EXPIRED = 2;
619         private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
620                 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
621 
622         private final HashedWheelTimer timer;
623         private final TimerTask task;
624         private final long deadline;
625 
626         @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
627         private volatile int state = ST_INIT;
628 
629         // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
630         // HashedWheelTimeout will be added to the correct HashedWheelBucket.
631         long remainingRounds;
632 
633         // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
634         // As only the workerThread will act on it there is no need for synchronization / volatile.
635         HashedWheelTimeout next;
636         HashedWheelTimeout prev;
637 
638         // The bucket to which the timeout was added
639         HashedWheelBucket bucket;
640 
641         HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
642             this.timer = timer;
643             this.task = task;
644             this.deadline = deadline;
645         }
646 
647         @Override
648         public Timer timer() {
649             return timer;
650         }
651 
652         @Override
653         public TimerTask task() {
654             return task;
655         }
656 
657         @Override
658         public boolean cancel() {
659             // only update the state it will be removed from HashedWheelBucket on next tick.
660             if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
661                 return false;
662             }
663             // If a task should be canceled we put this to another queue which will be processed on each tick.
664             // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
665             // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
666             timer.cancelledTimeouts.add(this);
667             return true;
668         }
669 
670         void remove() {
671             HashedWheelBucket bucket = this.bucket;
672             if (bucket != null) {
673                 bucket.remove(this);
674             } else {
675                 timer.pendingTimeouts.decrementAndGet();
676             }
677         }
678 
679         public boolean compareAndSetState(int expected, int state) {
680             return STATE_UPDATER.compareAndSet(this, expected, state);
681         }
682 
683         public int state() {
684             return state;
685         }
686 
687         @Override
688         public boolean isCancelled() {
689             return state() == ST_CANCELLED;
690         }
691 
692         @Override
693         public boolean isExpired() {
694             return state() == ST_EXPIRED;
695         }
696 
697         public void expire() {
698             if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
699                 return;
700             }
701 
702             try {
703                 timer.taskExecutor.execute(this);
704             } catch (Throwable t) {
705                 if (logger.isWarnEnabled()) {
706                     logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
707                             + " for execution.", t);
708                 }
709             }
710         }
711 
712         @Override
713         public void run() {
714             try {
715                 task.run(this);
716             } catch (Throwable t) {
717                 if (logger.isWarnEnabled()) {
718                     logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
719                 }
720             }
721         }
722 
723         @Override
724         public String toString() {
725             final long currentTime = System.nanoTime();
726             long remaining = deadline - currentTime + timer.startTime;
727 
728             StringBuilder buf = new StringBuilder(192)
729                .append(simpleClassName(this))
730                .append('(')
731                .append("deadline: ");
732             if (remaining > 0) {
733                 buf.append(remaining)
734                    .append(" ns later");
735             } else if (remaining < 0) {
736                 buf.append(-remaining)
737                    .append(" ns ago");
738             } else {
739                 buf.append("now");
740             }
741 
742             if (isCancelled()) {
743                 buf.append(", cancelled");
744             }
745 
746             return buf.append(", task: ")
747                       .append(task())
748                       .append(')')
749                       .toString();
750         }
751     }
752 
753     /**
754      * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
755      * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
756      * extra object creation is needed.
757      */
758     private static final class HashedWheelBucket {
759         // Used for the linked-list datastructure
760         private HashedWheelTimeout head;
761         private HashedWheelTimeout tail;
762 
763         /**
764          * Add {@link HashedWheelTimeout} to this bucket.
765          */
766         public void addTimeout(HashedWheelTimeout timeout) {
767             assert timeout.bucket == null;
768             timeout.bucket = this;
769             if (head == null) {
770                 head = tail = timeout;
771             } else {
772                 tail.next = timeout;
773                 timeout.prev = tail;
774                 tail = timeout;
775             }
776         }
777 
778         /**
779          * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
780          */
781         public void expireTimeouts(long deadline) {
782             HashedWheelTimeout timeout = head;
783 
784             // process all timeouts
785             while (timeout != null) {
786                 HashedWheelTimeout next = timeout.next;
787                 if (timeout.remainingRounds <= 0) {
788                     next = remove(timeout);
789                     if (timeout.deadline <= deadline) {
790                         timeout.expire();
791                     } else {
792                         // The timeout was placed into a wrong slot. This should never happen.
793                         throw new IllegalStateException(String.format(
794                                 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
795                     }
796                 } else if (timeout.isCancelled()) {
797                     next = remove(timeout);
798                 } else {
799                     timeout.remainingRounds --;
800                 }
801                 timeout = next;
802             }
803         }
804 
805         public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
806             HashedWheelTimeout next = timeout.next;
807             // remove timeout that was either processed or cancelled by updating the linked-list
808             if (timeout.prev != null) {
809                 timeout.prev.next = next;
810             }
811             if (timeout.next != null) {
812                 timeout.next.prev = timeout.prev;
813             }
814 
815             if (timeout == head) {
816                 // if timeout is also the tail we need to adjust the entry too
817                 if (timeout == tail) {
818                     tail = null;
819                     head = null;
820                 } else {
821                     head = next;
822                 }
823             } else if (timeout == tail) {
824                 // if the timeout is the tail modify the tail to be the prev node.
825                 tail = timeout.prev;
826             }
827             // null out prev, next and bucket to allow for GC.
828             timeout.prev = null;
829             timeout.next = null;
830             timeout.bucket = null;
831             timeout.timer.pendingTimeouts.decrementAndGet();
832             return next;
833         }
834 
835         /**
836          * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
837          */
838         public void clearTimeouts(Set<Timeout> set) {
839             for (;;) {
840                 HashedWheelTimeout timeout = pollTimeout();
841                 if (timeout == null) {
842                     return;
843                 }
844                 if (timeout.isExpired() || timeout.isCancelled()) {
845                     continue;
846                 }
847                 set.add(timeout);
848             }
849         }
850 
851         private HashedWheelTimeout pollTimeout() {
852             HashedWheelTimeout head = this.head;
853             if (head == null) {
854                 return null;
855             }
856             HashedWheelTimeout next = head.next;
857             if (next == null) {
858                 tail = this.head =  null;
859             } else {
860                 this.head = next;
861                 next.prev = null;
862             }
863 
864             // null out prev and next to allow for GC.
865             head.next = null;
866             head.prev = null;
867             head.bucket = null;
868             return head;
869         }
870     }
871 }