View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util;
17  
18  import static io.netty.util.internal.ObjectUtil.checkInRange;
19  import static io.netty.util.internal.ObjectUtil.checkPositive;
20  import static io.netty.util.internal.ObjectUtil.checkNotNull;
21  
22  import io.netty.util.concurrent.ImmediateExecutor;
23  import io.netty.util.internal.MathUtil;
24  import io.netty.util.internal.PlatformDependent;
25  import io.netty.util.internal.logging.InternalLogger;
26  import io.netty.util.internal.logging.InternalLoggerFactory;
27  
28  import java.util.Collections;
29  import java.util.HashSet;
30  import java.util.Queue;
31  import java.util.Set;
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.Executor;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.RejectedExecutionException;
36  import java.util.concurrent.ThreadFactory;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
41  import java.util.concurrent.atomic.AtomicLong;
42  
43  import static io.netty.util.internal.StringUtil.simpleClassName;
44  
45  /**
46   * A {@link Timer} optimized for approximated I/O timeout scheduling.
47   *
48   * <h3>Tick Duration</h3>
49   *
50   * As described with 'approximated', this timer does not execute the scheduled
51   * {@link TimerTask} on time.  {@link HashedWheelTimer}, on every tick, will
52   * check if there are any {@link TimerTask}s behind the schedule and execute
53   * them.
54   * <p>
55   * You can increase or decrease the accuracy of the execution timing by
56   * specifying smaller or larger tick duration in the constructor.  In most
57   * network applications, I/O timeout does not need to be accurate.  Therefore,
58   * the default tick duration is 100 milliseconds and you will not need to try
59   * different configurations in most cases.
60   *
61   * <h3>Ticks per Wheel (Wheel Size)</h3>
62   *
63   * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
64   * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
65   * function is 'dead line of the task'.  The default number of ticks per wheel
66   * (i.e. the size of the wheel) is 512.  You could specify a larger value
67   * if you are going to schedule a lot of timeouts.
68   *
69   * <h3>Do not create many instances.</h3>
70   *
71   * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
72   * started.  Therefore, you should make sure to create only one instance and
73   * share it across your application.  One of the common mistakes, that makes
74   * your application unresponsive, is to create a new instance for every connection.
75   *
76   * <h3>Implementation Details</h3>
77   *
78   * {@link HashedWheelTimer} is based on
79   * <a href="https://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
80   * Tony Lauck's paper,
81   * <a href="https://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
82   * and Hierarchical Timing Wheels: data structures to efficiently implement a
83   * timer facility'</a>.  More comprehensive slides are located
84   * <a href="https://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
85   */
86  public class HashedWheelTimer implements Timer {
87  
88      static final InternalLogger logger =
89              InternalLoggerFactory.getInstance(HashedWheelTimer.class);
90  
91      private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
92      private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
93      private static final int INSTANCE_COUNT_LIMIT = 64;
94      private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
95      private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
96              .newResourceLeakDetector(HashedWheelTimer.class, 1);
97  
98      private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
99              AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
100 
101     private final ResourceLeakTracker<HashedWheelTimer> leak;
102     private final Worker worker = new Worker();
103     private final Thread workerThread;
104 
105     public static final int WORKER_STATE_INIT = 0;
106     public static final int WORKER_STATE_STARTED = 1;
107     public static final int WORKER_STATE_SHUTDOWN = 2;
108     @SuppressWarnings({"unused", "FieldMayBeFinal"})
109     private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
110 
111     private final long tickDuration;
112     private final HashedWheelBucket[] wheel;
113     private final int mask;
114     private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
115     private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
116     private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
117     private final AtomicLong pendingTimeouts = new AtomicLong(0);
118     private final long maxPendingTimeouts;
119     private final Executor taskExecutor;
120 
121     private volatile long startTime;
122 
123     /**
124      * Creates a new timer with the default thread factory
125      * ({@link Executors#defaultThreadFactory()}), default tick duration, and
126      * default number of ticks per wheel.
127      */
128     public HashedWheelTimer() {
129         this(Executors.defaultThreadFactory());
130     }
131 
132     /**
133      * Creates a new timer with the default thread factory
134      * ({@link Executors#defaultThreadFactory()}) and default number of ticks
135      * per wheel.
136      *
137      * @param tickDuration the duration between tick
138      * @param unit         the time unit of the {@code tickDuration}
139      * @throws NullPointerException     if {@code unit} is {@code null}
140      * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
141      */
142     public HashedWheelTimer(long tickDuration, TimeUnit unit) {
143         this(Executors.defaultThreadFactory(), tickDuration, unit);
144     }
145 
146     /**
147      * Creates a new timer with the default thread factory
148      * ({@link Executors#defaultThreadFactory()}).
149      *
150      * @param tickDuration  the duration between tick
151      * @param unit          the time unit of the {@code tickDuration}
152      * @param ticksPerWheel the size of the wheel
153      * @throws NullPointerException     if {@code unit} is {@code null}
154      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
155      */
156     public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
157         this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
158     }
159 
160     /**
161      * Creates a new timer with the default tick duration and default number of
162      * ticks per wheel.
163      *
164      * @param threadFactory a {@link ThreadFactory} that creates a
165      *                      background {@link Thread} which is dedicated to
166      *                      {@link TimerTask} execution.
167      * @throws NullPointerException if {@code threadFactory} is {@code null}
168      */
169     public HashedWheelTimer(ThreadFactory threadFactory) {
170         this(threadFactory, 100, TimeUnit.MILLISECONDS);
171     }
172 
173     /**
174      * Creates a new timer with the default number of ticks per wheel.
175      *
176      * @param threadFactory a {@link ThreadFactory} that creates a
177      *                      background {@link Thread} which is dedicated to
178      *                      {@link TimerTask} execution.
179      * @param tickDuration  the duration between tick
180      * @param unit          the time unit of the {@code tickDuration}
181      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
182      * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
183      */
184     public HashedWheelTimer(
185             ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
186         this(threadFactory, tickDuration, unit, 512);
187     }
188 
189     /**
190      * Creates a new timer.
191      *
192      * @param threadFactory a {@link ThreadFactory} that creates a
193      *                      background {@link Thread} which is dedicated to
194      *                      {@link TimerTask} execution.
195      * @param tickDuration  the duration between tick
196      * @param unit          the time unit of the {@code tickDuration}
197      * @param ticksPerWheel the size of the wheel
198      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
199      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
200      */
201     public HashedWheelTimer(
202             ThreadFactory threadFactory,
203             long tickDuration, TimeUnit unit, int ticksPerWheel) {
204         this(threadFactory, tickDuration, unit, ticksPerWheel, true);
205     }
206 
207     /**
208      * Creates a new timer.
209      *
210      * @param threadFactory a {@link ThreadFactory} that creates a
211      *                      background {@link Thread} which is dedicated to
212      *                      {@link TimerTask} execution.
213      * @param tickDuration  the duration between tick
214      * @param unit          the time unit of the {@code tickDuration}
215      * @param ticksPerWheel the size of the wheel
216      * @param leakDetection {@code true} if leak detection should be enabled always,
217      *                      if false it will only be enabled if the worker thread is not
218      *                      a daemon thread.
219      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
220      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
221      */
222     public HashedWheelTimer(
223             ThreadFactory threadFactory,
224             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
225         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
226     }
227 
228     /**
229      * Creates a new timer.
230      *
231      * @param threadFactory        a {@link ThreadFactory} that creates a
232      *                             background {@link Thread} which is dedicated to
233      *                             {@link TimerTask} execution.
234      * @param tickDuration         the duration between tick
235      * @param unit                 the time unit of the {@code tickDuration}
236      * @param ticksPerWheel        the size of the wheel
237      * @param leakDetection        {@code true} if leak detection should be enabled always,
238      *                             if false it will only be enabled if the worker thread is not
239      *                             a daemon thread.
240      * @param  maxPendingTimeouts  The maximum number of pending timeouts after which call to
241      *                             {@code newTimeout} will result in
242      *                             {@link java.util.concurrent.RejectedExecutionException}
243      *                             being thrown. No maximum pending timeouts limit is assumed if
244      *                             this value is 0 or negative.
245      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
246      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
247      */
248     public HashedWheelTimer(
249             ThreadFactory threadFactory,
250             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
251             long maxPendingTimeouts) {
252         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
253                 maxPendingTimeouts, ImmediateExecutor.INSTANCE);
254     }
255     /**
256      * Creates a new timer.
257      *
258      * @param threadFactory        a {@link ThreadFactory} that creates a
259      *                             background {@link Thread} which is dedicated to
260      *                             {@link TimerTask} execution.
261      * @param tickDuration         the duration between tick
262      * @param unit                 the time unit of the {@code tickDuration}
263      * @param ticksPerWheel        the size of the wheel
264      * @param leakDetection        {@code true} if leak detection should be enabled always,
265      *                             if false it will only be enabled if the worker thread is not
266      *                             a daemon thread.
267      * @param maxPendingTimeouts   The maximum number of pending timeouts after which call to
268      *                             {@code newTimeout} will result in
269      *                             {@link java.util.concurrent.RejectedExecutionException}
270      *                             being thrown. No maximum pending timeouts limit is assumed if
271      *                             this value is 0 or negative.
272      * @param taskExecutor         The {@link Executor} that is used to execute the submitted {@link TimerTask}s.
273      *                             The caller is responsible to shutdown the {@link Executor} once it is not needed
274      *                             anymore.
275      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
276      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
277      */
278     public HashedWheelTimer(
279             ThreadFactory threadFactory,
280             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
281             long maxPendingTimeouts, Executor taskExecutor) {
282 
283         checkNotNull(threadFactory, "threadFactory");
284         checkNotNull(unit, "unit");
285         checkPositive(tickDuration, "tickDuration");
286         checkPositive(ticksPerWheel, "ticksPerWheel");
287         this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
288 
289         // Normalize ticksPerWheel to power of two and initialize the wheel.
290         wheel = createWheel(ticksPerWheel);
291         mask = wheel.length - 1;
292 
293         // Convert tickDuration to nanos.
294         long duration = unit.toNanos(tickDuration);
295 
296         // Prevent overflow.
297         if (duration >= Long.MAX_VALUE / wheel.length) {
298             throw new IllegalArgumentException(String.format(
299                     "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
300                     tickDuration, Long.MAX_VALUE / wheel.length));
301         }
302 
303         if (duration < MILLISECOND_NANOS) {
304             logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
305                         tickDuration, MILLISECOND_NANOS);
306             this.tickDuration = MILLISECOND_NANOS;
307         } else {
308             this.tickDuration = duration;
309         }
310 
311         workerThread = threadFactory.newThread(worker);
312 
313         leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
314 
315         this.maxPendingTimeouts = maxPendingTimeouts;
316 
317         if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
318             WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
319             reportTooManyInstances();
320         }
321     }
322 
323     @Override
324     protected void finalize() throws Throwable {
325         try {
326             super.finalize();
327         } finally {
328             // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
329             // we have not yet shutdown then we want to make sure we decrement the active instance count.
330             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
331                 INSTANCE_COUNTER.decrementAndGet();
332             }
333         }
334     }
335 
336     private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
337         ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);
338 
339         HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
340         for (int i = 0; i < wheel.length; i ++) {
341             wheel[i] = new HashedWheelBucket();
342         }
343         return wheel;
344     }
345 
346     /**
347      * Starts the background thread explicitly.  The background thread will
348      * start automatically on demand even if you did not call this method.
349      *
350      * @throws IllegalStateException if this timer has been
351      *                               {@linkplain #stop() stopped} already
352      */
353     public void start() {
354         switch (WORKER_STATE_UPDATER.get(this)) {
355             case WORKER_STATE_INIT:
356                 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
357                     workerThread.start();
358                 }
359                 break;
360             case WORKER_STATE_STARTED:
361                 break;
362             case WORKER_STATE_SHUTDOWN:
363                 throw new IllegalStateException("cannot be started once stopped");
364             default:
365                 throw new Error("Invalid WorkerState");
366         }
367 
368         // Wait until the startTime is initialized by the worker.
369         while (startTime == 0) {
370             try {
371                 startTimeInitialized.await();
372             } catch (InterruptedException ignore) {
373                 // Ignore - it will be ready very soon.
374             }
375         }
376     }
377 
378     @Override
379     public Set<Timeout> stop() {
380         if (Thread.currentThread() == workerThread) {
381             throw new IllegalStateException(
382                     HashedWheelTimer.class.getSimpleName() +
383                             ".stop() cannot be called from " +
384                             TimerTask.class.getSimpleName());
385         }
386 
387         if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
388             // workerState can be 0 or 2 at this moment - let it always be 2.
389             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
390                 INSTANCE_COUNTER.decrementAndGet();
391                 if (leak != null) {
392                     boolean closed = leak.close(this);
393                     assert closed;
394                 }
395             }
396 
397             return Collections.emptySet();
398         }
399 
400         try {
401             boolean interrupted = false;
402             while (workerThread.isAlive()) {
403                 workerThread.interrupt();
404                 try {
405                     workerThread.join(100);
406                 } catch (InterruptedException ignored) {
407                     interrupted = true;
408                 }
409             }
410 
411             if (interrupted) {
412                 Thread.currentThread().interrupt();
413             }
414         } finally {
415             INSTANCE_COUNTER.decrementAndGet();
416             if (leak != null) {
417                 boolean closed = leak.close(this);
418                 assert closed;
419             }
420         }
421         Set<Timeout> unprocessed = worker.unprocessedTimeouts();
422         Set<Timeout> cancelled = new HashSet<Timeout>(unprocessed.size());
423         for (Timeout timeout : unprocessed) {
424             if (timeout.cancel()) {
425                 cancelled.add(timeout);
426             }
427         }
428         return cancelled;
429     }
430 
431     @Override
432     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
433         checkNotNull(task, "task");
434         checkNotNull(unit, "unit");
435 
436         long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
437 
438         if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
439             pendingTimeouts.decrementAndGet();
440             throw new RejectedExecutionException("Number of pending timeouts ("
441                 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
442                 + "timeouts (" + maxPendingTimeouts + ")");
443         }
444 
445         start();
446 
447         // Add the timeout to the timeout queue which will be processed on the next tick.
448         // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
449         long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
450 
451         // Guard against overflow.
452         if (delay > 0 && deadline < 0) {
453             deadline = Long.MAX_VALUE;
454         }
455         HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
456         timeouts.add(timeout);
457         return timeout;
458     }
459 
460     /**
461      * Returns the number of pending timeouts of this {@link Timer}.
462      */
463     public long pendingTimeouts() {
464         return pendingTimeouts.get();
465     }
466 
467     private static void reportTooManyInstances() {
468         if (logger.isErrorEnabled()) {
469             String resourceType = simpleClassName(HashedWheelTimer.class);
470             logger.error("You are creating too many " + resourceType + " instances. " +
471                     resourceType + " is a shared resource that must be reused across the JVM, " +
472                     "so that only a few instances are created.");
473         }
474     }
475 
476     private final class Worker implements Runnable {
477         private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
478 
479         private long tick;
480 
481         @Override
482         public void run() {
483             // Initialize the startTime.
484             startTime = System.nanoTime();
485             if (startTime == 0) {
486                 // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
487                 startTime = 1;
488             }
489 
490             // Notify the other threads waiting for the initialization at start().
491             startTimeInitialized.countDown();
492 
493             do {
494                 final long deadline = waitForNextTick();
495                 if (deadline > 0) {
496                     int idx = (int) (tick & mask);
497                     processCancelledTasks();
498                     HashedWheelBucket bucket =
499                             wheel[idx];
500                     transferTimeoutsToBuckets();
501                     bucket.expireTimeouts(deadline);
502                     tick++;
503                 }
504             } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
505 
506             // Fill the unprocessedTimeouts so we can return them from stop() method.
507             for (HashedWheelBucket bucket: wheel) {
508                 bucket.clearTimeouts(unprocessedTimeouts);
509             }
510             for (;;) {
511                 HashedWheelTimeout timeout = timeouts.poll();
512                 if (timeout == null) {
513                     break;
514                 }
515                 if (!timeout.isCancelled()) {
516                     unprocessedTimeouts.add(timeout);
517                 }
518             }
519             processCancelledTasks();
520         }
521 
522         private void transferTimeoutsToBuckets() {
523             // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
524             // adds new timeouts in a loop.
525             for (int i = 0; i < 100000; i++) {
526                 HashedWheelTimeout timeout = timeouts.poll();
527                 if (timeout == null) {
528                     // all processed
529                     break;
530                 }
531                 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
532                     // Was cancelled in the meantime.
533                     continue;
534                 }
535 
536                 long calculated = timeout.deadline / tickDuration;
537                 timeout.remainingRounds = (calculated - tick) / wheel.length;
538 
539                 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
540                 int stopIndex = (int) (ticks & mask);
541 
542                 HashedWheelBucket bucket = wheel[stopIndex];
543                 bucket.addTimeout(timeout);
544             }
545         }
546 
547         private void processCancelledTasks() {
548             for (;;) {
549                 HashedWheelTimeout timeout = cancelledTimeouts.poll();
550                 if (timeout == null) {
551                     // all processed
552                     break;
553                 }
554                 try {
555                     timeout.remove();
556                 } catch (Throwable t) {
557                     if (logger.isWarnEnabled()) {
558                         logger.warn("An exception was thrown while process a cancellation task", t);
559                     }
560                 }
561             }
562         }
563 
564         /**
565          * calculate goal nanoTime from startTime and current tick number,
566          * then wait until that goal has been reached.
567          * @return Long.MIN_VALUE if received a shutdown request,
568          * current time otherwise (with Long.MIN_VALUE changed by +1)
569          */
570         private long waitForNextTick() {
571             long deadline = tickDuration * (tick + 1);
572 
573             for (;;) {
574                 final long currentTime = System.nanoTime() - startTime;
575                 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
576 
577                 if (sleepTimeMs <= 0) {
578                     if (currentTime == Long.MIN_VALUE) {
579                         return -Long.MAX_VALUE;
580                     } else {
581                         return currentTime;
582                     }
583                 }
584 
585                 // Check if we run on windows, as if thats the case we will need
586                 // to round the sleepTime as workaround for a bug that only affect
587                 // the JVM if it runs on windows.
588                 //
589                 // See https://github.com/netty/netty/issues/356
590                 if (PlatformDependent.isWindows()) {
591                     sleepTimeMs = sleepTimeMs / 10 * 10;
592                     if (sleepTimeMs == 0) {
593                         sleepTimeMs = 1;
594                     }
595                 }
596 
597                 try {
598                     Thread.sleep(sleepTimeMs);
599                 } catch (InterruptedException ignored) {
600                     if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
601                         return Long.MIN_VALUE;
602                     }
603                 }
604             }
605         }
606 
607         public Set<Timeout> unprocessedTimeouts() {
608             return Collections.unmodifiableSet(unprocessedTimeouts);
609         }
610     }
611 
612     private static final class HashedWheelTimeout implements Timeout, Runnable {
613 
614         private static final int ST_INIT = 0;
615         private static final int ST_CANCELLED = 1;
616         private static final int ST_EXPIRED = 2;
617         private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
618                 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
619 
620         private final HashedWheelTimer timer;
621         private final TimerTask task;
622         private final long deadline;
623 
624         @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
625         private volatile int state = ST_INIT;
626 
627         // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
628         // HashedWheelTimeout will be added to the correct HashedWheelBucket.
629         long remainingRounds;
630 
631         // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
632         // As only the workerThread will act on it there is no need for synchronization / volatile.
633         HashedWheelTimeout next;
634         HashedWheelTimeout prev;
635 
636         // The bucket to which the timeout was added
637         HashedWheelBucket bucket;
638 
639         HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
640             this.timer = timer;
641             this.task = task;
642             this.deadline = deadline;
643         }
644 
645         @Override
646         public Timer timer() {
647             return timer;
648         }
649 
650         @Override
651         public TimerTask task() {
652             return task;
653         }
654 
655         @Override
656         public boolean cancel() {
657             // only update the state it will be removed from HashedWheelBucket on next tick.
658             if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
659                 return false;
660             }
661             // If a task should be canceled we put this to another queue which will be processed on each tick.
662             // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
663             // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
664             timer.cancelledTimeouts.add(this);
665             return true;
666         }
667 
668         void remove() {
669             HashedWheelBucket bucket = this.bucket;
670             if (bucket != null) {
671                 bucket.remove(this);
672             } else {
673                 timer.pendingTimeouts.decrementAndGet();
674             }
675         }
676 
677         public boolean compareAndSetState(int expected, int state) {
678             return STATE_UPDATER.compareAndSet(this, expected, state);
679         }
680 
681         public int state() {
682             return state;
683         }
684 
685         @Override
686         public boolean isCancelled() {
687             return state() == ST_CANCELLED;
688         }
689 
690         @Override
691         public boolean isExpired() {
692             return state() == ST_EXPIRED;
693         }
694 
695         public void expire() {
696             if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
697                 return;
698             }
699 
700             try {
701                 timer.taskExecutor.execute(this);
702             } catch (Throwable t) {
703                 if (logger.isWarnEnabled()) {
704                     logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
705                             + " for execution.", t);
706                 }
707             }
708         }
709 
710         @Override
711         public void run() {
712             try {
713                 task.run(this);
714             } catch (Throwable t) {
715                 if (logger.isWarnEnabled()) {
716                     logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
717                 }
718             }
719         }
720 
721         @Override
722         public String toString() {
723             final long currentTime = System.nanoTime();
724             long remaining = deadline - currentTime + timer.startTime;
725 
726             StringBuilder buf = new StringBuilder(192)
727                .append(simpleClassName(this))
728                .append('(')
729                .append("deadline: ");
730             if (remaining > 0) {
731                 buf.append(remaining)
732                    .append(" ns later");
733             } else if (remaining < 0) {
734                 buf.append(-remaining)
735                    .append(" ns ago");
736             } else {
737                 buf.append("now");
738             }
739 
740             if (isCancelled()) {
741                 buf.append(", cancelled");
742             }
743 
744             return buf.append(", task: ")
745                       .append(task())
746                       .append(')')
747                       .toString();
748         }
749     }
750 
751     /**
752      * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
753      * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
754      * extra object creation is needed.
755      */
756     private static final class HashedWheelBucket {
757         // Used for the linked-list datastructure
758         private HashedWheelTimeout head;
759         private HashedWheelTimeout tail;
760 
761         /**
762          * Add {@link HashedWheelTimeout} to this bucket.
763          */
764         public void addTimeout(HashedWheelTimeout timeout) {
765             assert timeout.bucket == null;
766             timeout.bucket = this;
767             if (head == null) {
768                 head = tail = timeout;
769             } else {
770                 tail.next = timeout;
771                 timeout.prev = tail;
772                 tail = timeout;
773             }
774         }
775 
776         /**
777          * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
778          */
779         public void expireTimeouts(long deadline) {
780             HashedWheelTimeout timeout = head;
781 
782             // process all timeouts
783             while (timeout != null) {
784                 HashedWheelTimeout next = timeout.next;
785                 if (timeout.remainingRounds <= 0) {
786                     next = remove(timeout);
787                     if (timeout.deadline <= deadline) {
788                         timeout.expire();
789                     } else {
790                         // The timeout was placed into a wrong slot. This should never happen.
791                         throw new IllegalStateException(String.format(
792                                 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
793                     }
794                 } else if (timeout.isCancelled()) {
795                     next = remove(timeout);
796                 } else {
797                     timeout.remainingRounds --;
798                 }
799                 timeout = next;
800             }
801         }
802 
803         public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
804             HashedWheelTimeout next = timeout.next;
805             // remove timeout that was either processed or cancelled by updating the linked-list
806             if (timeout.prev != null) {
807                 timeout.prev.next = next;
808             }
809             if (timeout.next != null) {
810                 timeout.next.prev = timeout.prev;
811             }
812 
813             if (timeout == head) {
814                 // if timeout is also the tail we need to adjust the entry too
815                 if (timeout == tail) {
816                     tail = null;
817                     head = null;
818                 } else {
819                     head = next;
820                 }
821             } else if (timeout == tail) {
822                 // if the timeout is the tail modify the tail to be the prev node.
823                 tail = timeout.prev;
824             }
825             // null out prev, next and bucket to allow for GC.
826             timeout.prev = null;
827             timeout.next = null;
828             timeout.bucket = null;
829             timeout.timer.pendingTimeouts.decrementAndGet();
830             return next;
831         }
832 
833         /**
834          * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
835          */
836         public void clearTimeouts(Set<Timeout> set) {
837             for (;;) {
838                 HashedWheelTimeout timeout = pollTimeout();
839                 if (timeout == null) {
840                     return;
841                 }
842                 if (timeout.isExpired() || timeout.isCancelled()) {
843                     continue;
844                 }
845                 set.add(timeout);
846             }
847         }
848 
849         private HashedWheelTimeout pollTimeout() {
850             HashedWheelTimeout head = this.head;
851             if (head == null) {
852                 return null;
853             }
854             HashedWheelTimeout next = head.next;
855             if (next == null) {
856                 tail = this.head =  null;
857             } else {
858                 this.head = next;
859                 next.prev = null;
860             }
861 
862             // null out prev and next to allow for GC.
863             head.next = null;
864             head.prev = null;
865             head.bucket = null;
866             return head;
867         }
868     }
869 }