View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.util;
17  
18  import io.netty5.util.concurrent.ImmediateExecutor;
19  import io.netty5.util.internal.PlatformDependent;
20  import io.netty5.util.internal.logging.InternalLogger;
21  import io.netty5.util.internal.logging.InternalLoggerFactory;
22  
23  import java.util.Collections;
24  import java.util.HashSet;
25  import java.util.Queue;
26  import java.util.Set;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.RejectedExecutionException;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  import java.util.concurrent.atomic.AtomicInteger;
35  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
36  import java.util.concurrent.atomic.AtomicLong;
37  
38  import static io.netty5.util.internal.ObjectUtil.checkInRange;
39  import static io.netty5.util.internal.ObjectUtil.checkPositive;
40  import static io.netty5.util.internal.StringUtil.simpleClassName;
41  import static java.util.Objects.requireNonNull;
42  
43  /**
44   * A {@link Timer} optimized for approximated I/O timeout scheduling.
45   *
46   * <h3>Tick Duration</h3>
47   *
48   * As described with 'approximated', this timer does not execute the scheduled
49   * {@link TimerTask} on time.  {@link HashedWheelTimer}, on every tick, will
50   * check if there are any {@link TimerTask}s behind the schedule and execute
51   * them.
52   * <p>
53   * You can increase or decrease the accuracy of the execution timing by
54   * specifying smaller or larger tick duration in the constructor.  In most
55   * network applications, I/O timeout does not need to be accurate.  Therefore,
56   * the default tick duration is 100 milliseconds and you will not need to try
57   * different configurations in most cases.
58   *
59   * <h3>Ticks per Wheel (Wheel Size)</h3>
60   *
61   * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
62   * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
63   * function is 'dead line of the task'.  The default number of ticks per wheel
64   * (i.e. the size of the wheel) is 512.  You could specify a larger value
65   * if you are going to schedule a lot of timeouts.
66   *
67   * <h3>Do not create many instances.</h3>
68   *
69   * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
70   * started.  Therefore, you should make sure to create only one instance and
71   * share it across your application.  One of the common mistakes, that makes
72   * your application unresponsive, is to create a new instance for every connection.
73   *
74   * <h3>Implementation Details</h3>
75   *
76   * {@link HashedWheelTimer} is based on
77   * <a href="https://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
78   * Tony Lauck's paper,
79   * <a href="https://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
80   * and Hierarchical Timing Wheels: data structures to efficiently implement a
81   * timer facility'</a>.  More comprehensive slides are located
82   * <a href="https://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
83   */
84  public class HashedWheelTimer implements Timer {
85  
86      static final InternalLogger logger =
87              InternalLoggerFactory.getInstance(HashedWheelTimer.class);
88  
89      private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
90      private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
91      private static final int INSTANCE_COUNT_LIMIT = 64;
92      private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
93      private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
94              .newResourceLeakDetector(HashedWheelTimer.class, 1);
95  
96      private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
97              AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
98  
99      private final ResourceLeakTracker<HashedWheelTimer> leak;
100     private final Worker worker = new Worker();
101     private final Thread workerThread;
102 
103     public static final int WORKER_STATE_INIT = 0;
104     public static final int WORKER_STATE_STARTED = 1;
105     public static final int WORKER_STATE_SHUTDOWN = 2;
106     @SuppressWarnings({"unused", "FieldMayBeFinal"})
107     private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
108 
109     private final long tickDuration;
110     private final HashedWheelBucket[] wheel;
111     private final int mask;
112     private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
113     private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
114     private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
115     private final AtomicLong pendingTimeouts = new AtomicLong(0);
116     private final long maxPendingTimeouts;
117     private final Executor taskExecutor;
118 
119     private volatile long startTime;
120 
121     /**
122      * Creates a new timer with the default thread factory
123      * ({@link Executors#defaultThreadFactory()}), default tick duration, and
124      * default number of ticks per wheel.
125      */
126     public HashedWheelTimer() {
127         this(Executors.defaultThreadFactory());
128     }
129 
130     /**
131      * Creates a new timer with the default thread factory
132      * ({@link Executors#defaultThreadFactory()}) and default number of ticks
133      * per wheel.
134      *
135      * @param tickDuration the duration between tick
136      * @param unit         the time unit of the {@code tickDuration}
137      * @throws NullPointerException     if {@code unit} is {@code null}
138      * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
139      */
140     public HashedWheelTimer(long tickDuration, TimeUnit unit) {
141         this(Executors.defaultThreadFactory(), tickDuration, unit);
142     }
143 
144     /**
145      * Creates a new timer with the default thread factory
146      * ({@link Executors#defaultThreadFactory()}).
147      *
148      * @param tickDuration  the duration between tick
149      * @param unit          the time unit of the {@code tickDuration}
150      * @param ticksPerWheel the size of the wheel
151      * @throws NullPointerException     if {@code unit} is {@code null}
152      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
153      */
154     public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
155         this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
156     }
157 
158     /**
159      * Creates a new timer with the default tick duration and default number of
160      * ticks per wheel.
161      *
162      * @param threadFactory a {@link ThreadFactory} that creates a
163      *                      background {@link Thread} which is dedicated to
164      *                      {@link TimerTask} execution.
165      * @throws NullPointerException if {@code threadFactory} is {@code null}
166      */
167     public HashedWheelTimer(ThreadFactory threadFactory) {
168         this(threadFactory, 100, TimeUnit.MILLISECONDS);
169     }
170 
171     /**
172      * Creates a new timer with the default number of ticks per wheel.
173      *
174      * @param threadFactory a {@link ThreadFactory} that creates a
175      *                      background {@link Thread} which is dedicated to
176      *                      {@link TimerTask} execution.
177      * @param tickDuration  the duration between tick
178      * @param unit          the time unit of the {@code tickDuration}
179      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
180      * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
181      */
182     public HashedWheelTimer(
183             ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
184         this(threadFactory, tickDuration, unit, 512);
185     }
186 
187     /**
188      * Creates a new timer.
189      *
190      * @param threadFactory a {@link ThreadFactory} that creates a
191      *                      background {@link Thread} which is dedicated to
192      *                      {@link TimerTask} execution.
193      * @param tickDuration  the duration between tick
194      * @param unit          the time unit of the {@code tickDuration}
195      * @param ticksPerWheel the size of the wheel
196      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
197      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
198      */
199     public HashedWheelTimer(
200             ThreadFactory threadFactory,
201             long tickDuration, TimeUnit unit, int ticksPerWheel) {
202         this(threadFactory, tickDuration, unit, ticksPerWheel, true);
203     }
204 
205     /**
206      * Creates a new timer.
207      *
208      * @param threadFactory a {@link ThreadFactory} that creates a
209      *                      background {@link Thread} which is dedicated to
210      *                      {@link TimerTask} execution.
211      * @param tickDuration  the duration between tick
212      * @param unit          the time unit of the {@code tickDuration}
213      * @param ticksPerWheel the size of the wheel
214      * @param leakDetection {@code true} if leak detection should be enabled always,
215      *                      if false it will only be enabled if the worker thread is not
216      *                      a daemon thread.
217      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
218      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
219      */
220     public HashedWheelTimer(
221             ThreadFactory threadFactory,
222             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
223         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
224     }
225 
226     /**
227      * Creates a new timer.
228      *
229      * @param threadFactory        a {@link ThreadFactory} that creates a
230      *                             background {@link Thread} which is dedicated to
231      *                             {@link TimerTask} execution.
232      * @param tickDuration         the duration between tick
233      * @param unit                 the time unit of the {@code tickDuration}
234      * @param ticksPerWheel        the size of the wheel
235      * @param leakDetection        {@code true} if leak detection should be enabled always,
236      *                             if false it will only be enabled if the worker thread is not
237      *                             a daemon thread.
238      * @param  maxPendingTimeouts  The maximum number of pending timeouts after which call to
239      *                             {@code newTimeout} will result in
240      *                             {@link java.util.concurrent.RejectedExecutionException}
241      *                             being thrown. No maximum pending timeouts limit is assumed if
242      *                             this value is 0 or negative.
243      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
244      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
245      */
246     public HashedWheelTimer(
247             ThreadFactory threadFactory,
248             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
249             long maxPendingTimeouts) {
250         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
251                 maxPendingTimeouts, ImmediateExecutor.INSTANCE);
252     }
253     /**
254      * Creates a new timer.
255      *
256      * @param threadFactory        a {@link ThreadFactory} that creates a
257      *                             background {@link Thread} which is dedicated to
258      *                             {@link TimerTask} execution.
259      * @param tickDuration         the duration between tick
260      * @param unit                 the time unit of the {@code tickDuration}
261      * @param ticksPerWheel        the size of the wheel
262      * @param leakDetection        {@code true} if leak detection should be enabled always,
263      *                             if false it will only be enabled if the worker thread is not
264      *                             a daemon thread.
265      * @param maxPendingTimeouts   The maximum number of pending timeouts after which call to
266      *                             {@code newTimeout} will result in
267      *                             {@link java.util.concurrent.RejectedExecutionException}
268      *                             being thrown. No maximum pending timeouts limit is assumed if
269      *                             this value is 0 or negative.
270      * @param taskExecutor         The {@link Executor} that is used to execute the submitted {@link TimerTask}s.
271      *                             The caller is responsible to shutdown the {@link Executor} once it is not needed
272      *                             anymore.
273      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
274      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
275      */
276     public HashedWheelTimer(
277             ThreadFactory threadFactory,
278             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
279             long maxPendingTimeouts, Executor taskExecutor) {
280 
281         requireNonNull(threadFactory, "threadFactory");
282         requireNonNull(unit, "unit");
283         checkPositive(tickDuration, "tickDuration");
284         checkPositive(ticksPerWheel, "ticksPerWheel");
285         this.taskExecutor = requireNonNull(taskExecutor, "taskExecutor");
286 
287         // Normalize ticksPerWheel to power of two and initialize the wheel.
288         wheel = createWheel(ticksPerWheel);
289         mask = wheel.length - 1;
290 
291         // Convert tickDuration to nanos.
292         long duration = unit.toNanos(tickDuration);
293 
294         // Prevent overflow.
295         if (duration >= Long.MAX_VALUE / wheel.length) {
296             throw new IllegalArgumentException(String.format(
297                     "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
298                     tickDuration, Long.MAX_VALUE / wheel.length));
299         }
300 
301         if (duration < MILLISECOND_NANOS) {
302             logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
303                         tickDuration, MILLISECOND_NANOS);
304             this.tickDuration = MILLISECOND_NANOS;
305         } else {
306             this.tickDuration = duration;
307         }
308 
309         workerThread = threadFactory.newThread(worker);
310 
311         leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
312 
313         this.maxPendingTimeouts = maxPendingTimeouts;
314 
315         if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
316             WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
317             reportTooManyInstances();
318         }
319     }
320 
321     @Override
322     protected void finalize() throws Throwable {
323         try {
324             super.finalize();
325         } finally {
326             // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
327             // we have not yet shutdown then we want to make sure we decrement the active instance count.
328             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
329                 INSTANCE_COUNTER.decrementAndGet();
330             }
331         }
332     }
333 
334     private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
335         //ticksPerWheel may not be greater than 2^30
336         checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
337 
338         ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
339         HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
340         for (int i = 0; i < wheel.length; i ++) {
341             wheel[i] = new HashedWheelBucket();
342         }
343         return wheel;
344     }
345 
346     private static int normalizeTicksPerWheel(int ticksPerWheel) {
347         int normalizedTicksPerWheel = 1;
348         while (normalizedTicksPerWheel < ticksPerWheel) {
349             normalizedTicksPerWheel <<= 1;
350         }
351         return normalizedTicksPerWheel;
352     }
353 
354     /**
355      * Starts the background thread explicitly.  The background thread will
356      * start automatically on demand even if you did not call this method.
357      *
358      * @throws IllegalStateException if this timer has been
359      *                               {@linkplain #stop() stopped} already
360      */
361     public void start() {
362         switch (WORKER_STATE_UPDATER.get(this)) {
363             case WORKER_STATE_INIT:
364                 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
365                     workerThread.start();
366                 }
367                 break;
368             case WORKER_STATE_STARTED:
369                 break;
370             case WORKER_STATE_SHUTDOWN:
371                 throw new IllegalStateException("cannot be started once stopped");
372             default:
373                 throw new Error("Invalid WorkerState");
374         }
375 
376         // Wait until the startTime is initialized by the worker.
377         while (startTime == 0) {
378             try {
379                 startTimeInitialized.await();
380             } catch (InterruptedException ignore) {
381                 // Ignore - it will be ready very soon.
382             }
383         }
384     }
385 
386     @Override
387     public Set<Timeout> stop() {
388         if (Thread.currentThread() == workerThread) {
389             throw new IllegalStateException(
390                     HashedWheelTimer.class.getSimpleName() +
391                             ".stop() cannot be called from " +
392                             TimerTask.class.getSimpleName());
393         }
394 
395         if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
396             // workerState can be 0 or 2 at this moment - let it always be 2.
397             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
398                 INSTANCE_COUNTER.decrementAndGet();
399                 if (leak != null) {
400                     boolean closed = leak.close(this);
401                     assert closed;
402                 }
403             }
404 
405             return Collections.emptySet();
406         }
407 
408         try {
409             boolean interrupted = false;
410             while (workerThread.isAlive()) {
411                 workerThread.interrupt();
412                 try {
413                     workerThread.join(100);
414                 } catch (InterruptedException ignored) {
415                     interrupted = true;
416                 }
417             }
418 
419             if (interrupted) {
420                 Thread.currentThread().interrupt();
421             }
422         } finally {
423             INSTANCE_COUNTER.decrementAndGet();
424             if (leak != null) {
425                 boolean closed = leak.close(this);
426                 assert closed;
427             }
428         }
429         return worker.unprocessedTimeouts();
430     }
431 
432     @Override
433     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
434         requireNonNull(task, "task");
435         requireNonNull(unit, "unit");
436 
437         long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
438 
439         if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
440             pendingTimeouts.decrementAndGet();
441             throw new RejectedExecutionException("Number of pending timeouts ("
442                 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
443                 + "timeouts (" + maxPendingTimeouts + ")");
444         }
445 
446         start();
447 
448         // Add the timeout to the timeout queue which will be processed on the next tick.
449         // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
450         long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
451 
452         // Guard against overflow.
453         if (delay > 0 && deadline < 0) {
454             deadline = Long.MAX_VALUE;
455         }
456         HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
457         timeouts.add(timeout);
458         return timeout;
459     }
460 
461     /**
462      * Returns the number of pending timeouts of this {@link Timer}.
463      */
464     public long pendingTimeouts() {
465         return pendingTimeouts.get();
466     }
467 
468     private static void reportTooManyInstances() {
469         if (logger.isErrorEnabled()) {
470             String resourceType = simpleClassName(HashedWheelTimer.class);
471             logger.error("You are creating too many " + resourceType + " instances. " +
472                     resourceType + " is a shared resource that must be reused across the JVM, " +
473                     "so that only a few instances are created.");
474         }
475     }
476 
477     private final class Worker implements Runnable {
478         private final Set<Timeout> unprocessedTimeouts = new HashSet<>();
479 
480         private long tick;
481 
482         @Override
483         public void run() {
484             // Initialize the startTime.
485             startTime = System.nanoTime();
486             if (startTime == 0) {
487                 // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
488                 startTime = 1;
489             }
490 
491             // Notify the other threads waiting for the initialization at start().
492             startTimeInitialized.countDown();
493 
494             do {
495                 final long deadline = waitForNextTick();
496                 if (deadline > 0) {
497                     int idx = (int) (tick & mask);
498                     processCancelledTasks();
499                     HashedWheelBucket bucket =
500                             wheel[idx];
501                     transferTimeoutsToBuckets();
502                     bucket.expireTimeouts(deadline);
503                     tick++;
504                 }
505             } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
506 
507             // Fill the unprocessedTimeouts so we can return them from stop() method.
508             for (HashedWheelBucket bucket: wheel) {
509                 bucket.clearTimeouts(unprocessedTimeouts);
510             }
511             for (;;) {
512                 HashedWheelTimeout timeout = timeouts.poll();
513                 if (timeout == null) {
514                     break;
515                 }
516                 if (!timeout.isCancelled()) {
517                     unprocessedTimeouts.add(timeout);
518                 }
519             }
520             processCancelledTasks();
521         }
522 
523         private void transferTimeoutsToBuckets() {
524             // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
525             // adds new timeouts in a loop.
526             for (int i = 0; i < 100000; i++) {
527                 HashedWheelTimeout timeout = timeouts.poll();
528                 if (timeout == null) {
529                     // all processed
530                     break;
531                 }
532                 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
533                     // Was cancelled in the meantime.
534                     continue;
535                 }
536 
537                 long calculated = timeout.deadline / tickDuration;
538                 timeout.remainingRounds = (calculated - tick) / wheel.length;
539 
540                 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
541                 int stopIndex = (int) (ticks & mask);
542 
543                 HashedWheelBucket bucket = wheel[stopIndex];
544                 bucket.addTimeout(timeout);
545             }
546         }
547 
548         private void processCancelledTasks() {
549             for (;;) {
550                 HashedWheelTimeout timeout = cancelledTimeouts.poll();
551                 if (timeout == null) {
552                     // all processed
553                     break;
554                 }
555                 try {
556                     timeout.remove();
557                 } catch (Throwable t) {
558                     if (logger.isWarnEnabled()) {
559                         logger.warn("An exception was thrown while process a cancellation task", t);
560                     }
561                 }
562             }
563         }
564 
565         /**
566          * calculate goal nanoTime from startTime and current tick number,
567          * then wait until that goal has been reached.
568          * @return Long.MIN_VALUE if received a shutdown request,
569          * current time otherwise (with Long.MIN_VALUE changed by +1)
570          */
571         private long waitForNextTick() {
572             long deadline = tickDuration * (tick + 1);
573 
574             for (;;) {
575                 final long currentTime = System.nanoTime() - startTime;
576                 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
577 
578                 if (sleepTimeMs <= 0) {
579                     if (currentTime == Long.MIN_VALUE) {
580                         return -Long.MAX_VALUE;
581                     } else {
582                         return currentTime;
583                     }
584                 }
585 
586                 // Check if we run on windows, as if thats the case we will need
587                 // to round the sleepTime as workaround for a bug that only affect
588                 // the JVM if it runs on windows.
589                 //
590                 // See https://github.com/netty/netty/issues/356
591                 if (PlatformDependent.isWindows()) {
592                     sleepTimeMs = sleepTimeMs / 10 * 10;
593                     if (sleepTimeMs == 0) {
594                         sleepTimeMs = 1;
595                     }
596                 }
597 
598                 try {
599                     Thread.sleep(sleepTimeMs);
600                 } catch (InterruptedException ignored) {
601                     if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
602                         return Long.MIN_VALUE;
603                     }
604                 }
605             }
606         }
607 
608         public Set<Timeout> unprocessedTimeouts() {
609             return Collections.unmodifiableSet(unprocessedTimeouts);
610         }
611     }
612 
613     private static final class HashedWheelTimeout implements Timeout, Runnable {
614 
615         private static final int ST_INIT = 0;
616         private static final int ST_CANCELLED = 1;
617         private static final int ST_EXPIRED = 2;
618         private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
619                 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
620 
621         private final HashedWheelTimer timer;
622         private final TimerTask task;
623         private final long deadline;
624 
625         @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
626         private volatile int state = ST_INIT;
627 
628         // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
629         // HashedWheelTimeout will be added to the correct HashedWheelBucket.
630         long remainingRounds;
631 
632         // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
633         // As only the workerThread will act on it there is no need for synchronization / volatile.
634         HashedWheelTimeout next;
635         HashedWheelTimeout prev;
636 
637         // The bucket to which the timeout was added
638         HashedWheelBucket bucket;
639 
640         HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
641             this.timer = timer;
642             this.task = task;
643             this.deadline = deadline;
644         }
645 
646         @Override
647         public Timer timer() {
648             return timer;
649         }
650 
651         @Override
652         public TimerTask task() {
653             return task;
654         }
655 
656         @Override
657         public boolean cancel() {
658             // only update the state it will be removed from HashedWheelBucket on next tick.
659             if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
660                 return false;
661             }
662             // If a task should be canceled we put this to another queue which will be processed on each tick.
663             // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
664             // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
665             timer.cancelledTimeouts.add(this);
666             return true;
667         }
668 
669         void remove() {
670             HashedWheelBucket bucket = this.bucket;
671             if (bucket != null) {
672                 bucket.remove(this);
673             } else {
674                 timer.pendingTimeouts.decrementAndGet();
675             }
676         }
677 
678         public boolean compareAndSetState(int expected, int state) {
679             return STATE_UPDATER.compareAndSet(this, expected, state);
680         }
681 
682         public int state() {
683             return state;
684         }
685 
686         @Override
687         public boolean isCancelled() {
688             return state() == ST_CANCELLED;
689         }
690 
691         @Override
692         public boolean isExpired() {
693             return state() == ST_EXPIRED;
694         }
695 
696         public void expire() {
697             if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
698                 return;
699             }
700 
701             try {
702                 timer.taskExecutor.execute(this);
703             } catch (Throwable t) {
704                 if (logger.isWarnEnabled()) {
705                     logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
706                             + " for execution.", t);
707                 }
708             }
709         }
710 
711         @Override
712         public void run() {
713             try {
714                 task.run(this);
715             } catch (Throwable t) {
716                 if (logger.isWarnEnabled()) {
717                     logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
718                 }
719             }
720         }
721 
722         @Override
723         public String toString() {
724             final long currentTime = System.nanoTime();
725             long remaining = deadline - currentTime + timer.startTime;
726 
727             StringBuilder buf = new StringBuilder(192)
728                .append(simpleClassName(this))
729                .append('(')
730                .append("deadline: ");
731             if (remaining > 0) {
732                 buf.append(remaining)
733                    .append(" ns later");
734             } else if (remaining < 0) {
735                 buf.append(-remaining)
736                    .append(" ns ago");
737             } else {
738                 buf.append("now");
739             }
740 
741             if (isCancelled()) {
742                 buf.append(", cancelled");
743             }
744 
745             return buf.append(", task: ")
746                       .append(task())
747                       .append(')')
748                       .toString();
749         }
750     }
751 
752     /**
753      * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
754      * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
755      * extra object creation is needed.
756      */
757     private static final class HashedWheelBucket {
758         // Used for the linked-list datastructure
759         private HashedWheelTimeout head;
760         private HashedWheelTimeout tail;
761 
762         /**
763          * Add {@link HashedWheelTimeout} to this bucket.
764          */
765         public void addTimeout(HashedWheelTimeout timeout) {
766             assert timeout.bucket == null;
767             timeout.bucket = this;
768             if (head == null) {
769                 head = tail = timeout;
770             } else {
771                 tail.next = timeout;
772                 timeout.prev = tail;
773                 tail = timeout;
774             }
775         }
776 
777         /**
778          * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
779          */
780         public void expireTimeouts(long deadline) {
781             HashedWheelTimeout timeout = head;
782 
783             // process all timeouts
784             while (timeout != null) {
785                 HashedWheelTimeout next = timeout.next;
786                 if (timeout.remainingRounds <= 0) {
787                     next = remove(timeout);
788                     if (timeout.deadline <= deadline) {
789                         timeout.expire();
790                     } else {
791                         // The timeout was placed into a wrong slot. This should never happen.
792                         throw new IllegalStateException(String.format(
793                                 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
794                     }
795                 } else if (timeout.isCancelled()) {
796                     next = remove(timeout);
797                 } else {
798                     timeout.remainingRounds --;
799                 }
800                 timeout = next;
801             }
802         }
803 
804         public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
805             HashedWheelTimeout next = timeout.next;
806             // remove timeout that was either processed or cancelled by updating the linked-list
807             if (timeout.prev != null) {
808                 timeout.prev.next = next;
809             }
810             if (timeout.next != null) {
811                 timeout.next.prev = timeout.prev;
812             }
813 
814             if (timeout == head) {
815                 // if timeout is also the tail we need to adjust the entry too
816                 if (timeout == tail) {
817                     tail = null;
818                     head = null;
819                 } else {
820                     head = next;
821                 }
822             } else if (timeout == tail) {
823                 // if the timeout is the tail modify the tail to be the prev node.
824                 tail = timeout.prev;
825             }
826             // null out prev, next and bucket to allow for GC.
827             timeout.prev = null;
828             timeout.next = null;
829             timeout.bucket = null;
830             timeout.timer.pendingTimeouts.decrementAndGet();
831             return next;
832         }
833 
834         /**
835          * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
836          */
837         public void clearTimeouts(Set<Timeout> set) {
838             for (;;) {
839                 HashedWheelTimeout timeout = pollTimeout();
840                 if (timeout == null) {
841                     return;
842                 }
843                 if (timeout.isExpired() || timeout.isCancelled()) {
844                     continue;
845                 }
846                 set.add(timeout);
847             }
848         }
849 
850         private HashedWheelTimeout pollTimeout() {
851             HashedWheelTimeout head = this.head;
852             if (head == null) {
853                 return null;
854             }
855             HashedWheelTimeout next = head.next;
856             if (next == null) {
857                 tail = this.head =  null;
858             } else {
859                 this.head = next;
860                 next.prev = null;
861             }
862 
863             // null out prev and next to allow for GC.
864             head.next = null;
865             head.prev = null;
866             head.bucket = null;
867             return head;
868         }
869     }
870 }