View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util;
17  
18  import static io.netty.util.internal.ObjectUtil.checkPositive;
19  import static io.netty.util.internal.ObjectUtil.checkNotNull;
20  
21  import io.netty.util.concurrent.ImmediateExecutor;
22  import io.netty.util.internal.MathUtil;
23  import io.netty.util.internal.PlatformDependent;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.CountDownLatch;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.Executors;
34  import java.util.concurrent.RejectedExecutionException;
35  import java.util.concurrent.ThreadFactory;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import static io.netty.util.internal.StringUtil.simpleClassName;
43  
44  /**
45   * A {@link Timer} optimized for approximated I/O timeout scheduling.
46   *
47   * <h3>Tick Duration</h3>
48   *
49   * As described with 'approximated', this timer does not execute the scheduled
50   * {@link TimerTask} on time.  {@link HashedWheelTimer}, on every tick, will
51   * check if there are any {@link TimerTask}s behind the schedule and execute
52   * them.
53   * <p>
54   * You can increase or decrease the accuracy of the execution timing by
55   * specifying smaller or larger tick duration in the constructor.  In most
56   * network applications, I/O timeout does not need to be accurate.  Therefore,
57   * the default tick duration is 100 milliseconds and you will not need to try
58   * different configurations in most cases.
59   *
60   * <h3>Ticks per Wheel (Wheel Size)</h3>
61   *
62   * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
63   * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
64   * function is 'dead line of the task'.  The default number of ticks per wheel
65   * (i.e. the size of the wheel) is 512.  You could specify a larger value
66   * if you are going to schedule a lot of timeouts.
67   *
68   * <h3>Do not create many instances.</h3>
69   *
70   * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
71   * started.  Therefore, you should make sure to create only one instance and
72   * share it across your application.  One of the common mistakes, that makes
73   * your application unresponsive, is to create a new instance for every connection.
74   *
75   * <h3>Implementation Details</h3>
76   *
77   * {@link HashedWheelTimer} is based on
78   * <a href="https://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
79   * Tony Lauck's paper,
80   * <a href="https://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
81   * and Hierarchical Timing Wheels: data structures to efficiently implement a
82   * timer facility'</a>.  More comprehensive slides are located
83   * <a href="https://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
84   */
85  public class HashedWheelTimer implements Timer {
86  
87      static final InternalLogger logger =
88              InternalLoggerFactory.getInstance(HashedWheelTimer.class);
89  
90      private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
91      private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
92      private static final int INSTANCE_COUNT_LIMIT = 64;
93      private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
94      private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
95              .newResourceLeakDetector(HashedWheelTimer.class, 1);
96  
97      private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
98              AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
99  
100     private final ResourceLeakTracker<HashedWheelTimer> leak;
101     private final Worker worker = new Worker();
102     private final Thread workerThread;
103 
104     public static final int WORKER_STATE_INIT = 0;
105     public static final int WORKER_STATE_STARTED = 1;
106     public static final int WORKER_STATE_SHUTDOWN = 2;
107     @SuppressWarnings({"unused", "FieldMayBeFinal"})
108     private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
109 
110     private final long tickDuration;
111     private final HashedWheelBucket[] wheel;
112     private final int mask;
113     private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
114     private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
115     private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
116     private final AtomicLong pendingTimeouts = new AtomicLong(0);
117     private final long maxPendingTimeouts;
118     private final Executor taskExecutor;
119 
120     private volatile long startTime;
121 
122     /**
123      * Creates a new timer with the default thread factory
124      * ({@link Executors#defaultThreadFactory()}), default tick duration, and
125      * default number of ticks per wheel.
126      */
127     public HashedWheelTimer() {
128         this(Executors.defaultThreadFactory());
129     }
130 
131     /**
132      * Creates a new timer with the default thread factory
133      * ({@link Executors#defaultThreadFactory()}) and default number of ticks
134      * per wheel.
135      *
136      * @param tickDuration the duration between tick
137      * @param unit         the time unit of the {@code tickDuration}
138      * @throws NullPointerException     if {@code unit} is {@code null}
139      * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
140      */
141     public HashedWheelTimer(long tickDuration, TimeUnit unit) {
142         this(Executors.defaultThreadFactory(), tickDuration, unit);
143     }
144 
145     /**
146      * Creates a new timer with the default thread factory
147      * ({@link Executors#defaultThreadFactory()}).
148      *
149      * @param tickDuration  the duration between tick
150      * @param unit          the time unit of the {@code tickDuration}
151      * @param ticksPerWheel the size of the wheel
152      * @throws NullPointerException     if {@code unit} is {@code null}
153      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
154      */
155     public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
156         this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
157     }
158 
159     /**
160      * Creates a new timer with the default tick duration and default number of
161      * ticks per wheel.
162      *
163      * @param threadFactory a {@link ThreadFactory} that creates a
164      *                      background {@link Thread} which is dedicated to
165      *                      {@link TimerTask} execution.
166      * @throws NullPointerException if {@code threadFactory} is {@code null}
167      */
168     public HashedWheelTimer(ThreadFactory threadFactory) {
169         this(threadFactory, 100, TimeUnit.MILLISECONDS);
170     }
171 
172     /**
173      * Creates a new timer with the default number of ticks per wheel.
174      *
175      * @param threadFactory a {@link ThreadFactory} that creates a
176      *                      background {@link Thread} which is dedicated to
177      *                      {@link TimerTask} execution.
178      * @param tickDuration  the duration between tick
179      * @param unit          the time unit of the {@code tickDuration}
180      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
181      * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
182      */
183     public HashedWheelTimer(
184             ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
185         this(threadFactory, tickDuration, unit, 512);
186     }
187 
188     /**
189      * Creates a new timer.
190      *
191      * @param threadFactory a {@link ThreadFactory} that creates a
192      *                      background {@link Thread} which is dedicated to
193      *                      {@link TimerTask} execution.
194      * @param tickDuration  the duration between tick
195      * @param unit          the time unit of the {@code tickDuration}
196      * @param ticksPerWheel the size of the wheel
197      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
198      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
199      */
200     public HashedWheelTimer(
201             ThreadFactory threadFactory,
202             long tickDuration, TimeUnit unit, int ticksPerWheel) {
203         this(threadFactory, tickDuration, unit, ticksPerWheel, true);
204     }
205 
206     /**
207      * Creates a new timer.
208      *
209      * @param threadFactory a {@link ThreadFactory} that creates a
210      *                      background {@link Thread} which is dedicated to
211      *                      {@link TimerTask} execution.
212      * @param tickDuration  the duration between tick
213      * @param unit          the time unit of the {@code tickDuration}
214      * @param ticksPerWheel the size of the wheel
215      * @param leakDetection {@code true} if leak detection should be enabled always,
216      *                      if false it will only be enabled if the worker thread is not
217      *                      a daemon thread.
218      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
219      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
220      */
221     public HashedWheelTimer(
222             ThreadFactory threadFactory,
223             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
224         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
225     }
226 
227     /**
228      * Creates a new timer.
229      *
230      * @param threadFactory        a {@link ThreadFactory} that creates a
231      *                             background {@link Thread} which is dedicated to
232      *                             {@link TimerTask} execution.
233      * @param tickDuration         the duration between tick
234      * @param unit                 the time unit of the {@code tickDuration}
235      * @param ticksPerWheel        the size of the wheel
236      * @param leakDetection        {@code true} if leak detection should be enabled always,
237      *                             if false it will only be enabled if the worker thread is not
238      *                             a daemon thread.
239      * @param  maxPendingTimeouts  The maximum number of pending timeouts after which call to
240      *                             {@code newTimeout} will result in
241      *                             {@link java.util.concurrent.RejectedExecutionException}
242      *                             being thrown. No maximum pending timeouts limit is assumed if
243      *                             this value is 0 or negative.
244      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
245      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
246      */
247     public HashedWheelTimer(
248             ThreadFactory threadFactory,
249             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
250             long maxPendingTimeouts) {
251         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
252                 maxPendingTimeouts, ImmediateExecutor.INSTANCE);
253     }
254     /**
255      * Creates a new timer.
256      *
257      * @param threadFactory        a {@link ThreadFactory} that creates a
258      *                             background {@link Thread} which is dedicated to
259      *                             {@link TimerTask} execution.
260      * @param tickDuration         the duration between tick
261      * @param unit                 the time unit of the {@code tickDuration}
262      * @param ticksPerWheel        the size of the wheel
263      * @param leakDetection        {@code true} if leak detection should be enabled always,
264      *                             if false it will only be enabled if the worker thread is not
265      *                             a daemon thread.
266      * @param maxPendingTimeouts   The maximum number of pending timeouts after which call to
267      *                             {@code newTimeout} will result in
268      *                             {@link java.util.concurrent.RejectedExecutionException}
269      *                             being thrown. No maximum pending timeouts limit is assumed if
270      *                             this value is 0 or negative.
271      * @param taskExecutor         The {@link Executor} that is used to execute the submitted {@link TimerTask}s.
272      *                             The caller is responsible to shutdown the {@link Executor} once it is not needed
273      *                             anymore.
274      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
275      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
276      */
277     public HashedWheelTimer(
278             ThreadFactory threadFactory,
279             long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
280             long maxPendingTimeouts, Executor taskExecutor) {
281 
282         checkNotNull(threadFactory, "threadFactory");
283         checkNotNull(unit, "unit");
284         checkPositive(tickDuration, "tickDuration");
285         checkPositive(ticksPerWheel, "ticksPerWheel");
286         this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
287 
288         // Normalize ticksPerWheel to power of two and initialize the wheel.
289         wheel = createWheel(ticksPerWheel);
290         mask = wheel.length - 1;
291 
292         // Convert tickDuration to nanos.
293         long duration = unit.toNanos(tickDuration);
294 
295         // Prevent overflow.
296         if (duration >= Long.MAX_VALUE / wheel.length) {
297             throw new IllegalArgumentException(String.format(
298                     "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
299                     tickDuration, Long.MAX_VALUE / wheel.length));
300         }
301 
302         if (duration < MILLISECOND_NANOS) {
303             logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
304                         tickDuration, MILLISECOND_NANOS);
305             this.tickDuration = MILLISECOND_NANOS;
306         } else {
307             this.tickDuration = duration;
308         }
309 
310         workerThread = threadFactory.newThread(worker);
311 
312         leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
313 
314         this.maxPendingTimeouts = maxPendingTimeouts;
315 
316         if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
317             WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
318             reportTooManyInstances();
319         }
320     }
321 
322     @Override
323     protected void finalize() throws Throwable {
324         try {
325             super.finalize();
326         } finally {
327             // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
328             // we have not yet shutdown then we want to make sure we decrement the active instance count.
329             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
330                 INSTANCE_COUNTER.decrementAndGet();
331             }
332         }
333     }
334 
335     private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
336         ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);
337 
338         HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
339         for (int i = 0; i < wheel.length; i ++) {
340             wheel[i] = new HashedWheelBucket();
341         }
342         return wheel;
343     }
344 
345     /**
346      * Starts the background thread explicitly.  The background thread will
347      * start automatically on demand even if you did not call this method.
348      *
349      * @throws IllegalStateException if this timer has been
350      *                               {@linkplain #stop() stopped} already
351      */
352     public void start() {
353         switch (WORKER_STATE_UPDATER.get(this)) {
354             case WORKER_STATE_INIT:
355                 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
356                     workerThread.start();
357                 }
358                 break;
359             case WORKER_STATE_STARTED:
360                 break;
361             case WORKER_STATE_SHUTDOWN:
362                 throw new IllegalStateException("cannot be started once stopped");
363             default:
364                 throw new Error("Invalid WorkerState");
365         }
366 
367         // Wait until the startTime is initialized by the worker.
368         while (startTime == 0) {
369             try {
370                 startTimeInitialized.await();
371             } catch (InterruptedException ignore) {
372                 // Ignore - it will be ready very soon.
373             }
374         }
375     }
376 
377     @Override
378     public Set<Timeout> stop() {
379         if (Thread.currentThread() == workerThread) {
380             throw new IllegalStateException(
381                     HashedWheelTimer.class.getSimpleName() +
382                             ".stop() cannot be called from " +
383                             TimerTask.class.getSimpleName());
384         }
385 
386         if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
387             // workerState can be 0 or 2 at this moment - let it always be 2.
388             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
389                 INSTANCE_COUNTER.decrementAndGet();
390                 if (leak != null) {
391                     boolean closed = leak.close(this);
392                     assert closed;
393                 }
394             }
395 
396             return Collections.emptySet();
397         }
398 
399         try {
400             boolean interrupted = false;
401             while (workerThread.isAlive()) {
402                 workerThread.interrupt();
403                 try {
404                     workerThread.join(100);
405                 } catch (InterruptedException ignored) {
406                     interrupted = true;
407                 }
408             }
409 
410             if (interrupted) {
411                 Thread.currentThread().interrupt();
412             }
413         } finally {
414             INSTANCE_COUNTER.decrementAndGet();
415             if (leak != null) {
416                 boolean closed = leak.close(this);
417                 assert closed;
418             }
419         }
420         Set<Timeout> unprocessed = worker.unprocessedTimeouts();
421         Set<Timeout> cancelled = new HashSet<Timeout>(unprocessed.size());
422         for (Timeout timeout : unprocessed) {
423             if (timeout.cancel()) {
424                 cancelled.add(timeout);
425             }
426         }
427         return cancelled;
428     }
429 
430     @Override
431     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
432         checkNotNull(task, "task");
433         checkNotNull(unit, "unit");
434 
435         long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
436 
437         if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
438             pendingTimeouts.decrementAndGet();
439             throw new RejectedExecutionException("Number of pending timeouts ("
440                 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
441                 + "timeouts (" + maxPendingTimeouts + ")");
442         }
443 
444         start();
445 
446         // Add the timeout to the timeout queue which will be processed on the next tick.
447         // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
448         long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
449 
450         // Guard against overflow.
451         if (delay > 0 && deadline < 0) {
452             deadline = Long.MAX_VALUE;
453         }
454         HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
455         timeouts.add(timeout);
456         return timeout;
457     }
458 
459     /**
460      * Returns the number of pending timeouts of this {@link Timer}.
461      */
462     public long pendingTimeouts() {
463         return pendingTimeouts.get();
464     }
465 
466     private static void reportTooManyInstances() {
467         if (logger.isErrorEnabled()) {
468             String resourceType = simpleClassName(HashedWheelTimer.class);
469             logger.error("You are creating too many " + resourceType + " instances. " +
470                     resourceType + " is a shared resource that must be reused across the JVM, " +
471                     "so that only a few instances are created.");
472         }
473     }
474 
475     private final class Worker implements Runnable {
476         private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
477 
478         private long tick;
479 
480         @Override
481         public void run() {
482             // Initialize the startTime.
483             startTime = System.nanoTime();
484             if (startTime == 0) {
485                 // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
486                 startTime = 1;
487             }
488 
489             // Notify the other threads waiting for the initialization at start().
490             startTimeInitialized.countDown();
491 
492             do {
493                 final long deadline = waitForNextTick();
494                 if (deadline > 0) {
495                     int idx = (int) (tick & mask);
496                     processCancelledTasks();
497                     HashedWheelBucket bucket =
498                             wheel[idx];
499                     transferTimeoutsToBuckets();
500                     bucket.expireTimeouts(deadline);
501                     tick++;
502                 }
503             } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
504 
505             // Fill the unprocessedTimeouts so we can return them from stop() method.
506             for (HashedWheelBucket bucket: wheel) {
507                 bucket.clearTimeouts(unprocessedTimeouts);
508             }
509             for (;;) {
510                 HashedWheelTimeout timeout = timeouts.poll();
511                 if (timeout == null) {
512                     break;
513                 }
514                 if (!timeout.isCancelled()) {
515                     unprocessedTimeouts.add(timeout);
516                 }
517             }
518             processCancelledTasks();
519         }
520 
521         private void transferTimeoutsToBuckets() {
522             // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
523             // adds new timeouts in a loop.
524             for (int i = 0; i < 100000; i++) {
525                 HashedWheelTimeout timeout = timeouts.poll();
526                 if (timeout == null) {
527                     // all processed
528                     break;
529                 }
530                 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
531                     // Was cancelled in the meantime.
532                     continue;
533                 }
534 
535                 long calculated = timeout.deadline / tickDuration;
536                 timeout.remainingRounds = (calculated - tick) / wheel.length;
537 
538                 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
539                 int stopIndex = (int) (ticks & mask);
540 
541                 HashedWheelBucket bucket = wheel[stopIndex];
542                 bucket.addTimeout(timeout);
543             }
544         }
545 
546         private void processCancelledTasks() {
547             for (;;) {
548                 HashedWheelTimeout timeout = cancelledTimeouts.poll();
549                 if (timeout == null) {
550                     // all processed
551                     break;
552                 }
553                 try {
554                     timeout.removeAfterCancellation();
555                 } catch (Throwable t) {
556                     if (logger.isWarnEnabled()) {
557                         logger.warn("An exception was thrown while process a cancellation task", t);
558                     }
559                 }
560             }
561         }
562 
563         /**
564          * calculate goal nanoTime from startTime and current tick number,
565          * then wait until that goal has been reached.
566          * @return Long.MIN_VALUE if received a shutdown request,
567          * current time otherwise (with Long.MIN_VALUE changed by +1)
568          */
569         private long waitForNextTick() {
570             long deadline = tickDuration * (tick + 1);
571 
572             for (;;) {
573                 final long currentTime = System.nanoTime() - startTime;
574                 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
575 
576                 if (sleepTimeMs <= 0) {
577                     if (currentTime == Long.MIN_VALUE) {
578                         return -Long.MAX_VALUE;
579                     } else {
580                         return currentTime;
581                     }
582                 }
583 
584                 // Check if we run on windows, as if thats the case we will need
585                 // to round the sleepTime as workaround for a bug that only affect
586                 // the JVM if it runs on windows.
587                 //
588                 // See https://github.com/netty/netty/issues/356
589                 if (PlatformDependent.isWindows()) {
590                     sleepTimeMs = sleepTimeMs / 10 * 10;
591                     if (sleepTimeMs == 0) {
592                         sleepTimeMs = 1;
593                     }
594                 }
595 
596                 try {
597                     Thread.sleep(sleepTimeMs);
598                 } catch (InterruptedException ignored) {
599                     if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
600                         return Long.MIN_VALUE;
601                     }
602                 }
603             }
604         }
605 
606         public Set<Timeout> unprocessedTimeouts() {
607             return Collections.unmodifiableSet(unprocessedTimeouts);
608         }
609     }
610 
611     private static final class HashedWheelTimeout implements Timeout, Runnable {
612 
613         private static final int ST_INIT = 0;
614         private static final int ST_CANCELLED = 1;
615         private static final int ST_EXPIRED = 2;
616         private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
617                 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
618 
619         private final HashedWheelTimer timer;
620         private final TimerTask task;
621         private final long deadline;
622 
623         @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
624         private volatile int state = ST_INIT;
625 
626         // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
627         // HashedWheelTimeout will be added to the correct HashedWheelBucket.
628         long remainingRounds;
629 
630         // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
631         // As only the workerThread will act on it there is no need for synchronization / volatile.
632         HashedWheelTimeout next;
633         HashedWheelTimeout prev;
634 
635         // The bucket to which the timeout was added
636         HashedWheelBucket bucket;
637 
638         HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
639             this.timer = timer;
640             this.task = task;
641             this.deadline = deadline;
642         }
643 
644         @Override
645         public Timer timer() {
646             return timer;
647         }
648 
649         @Override
650         public TimerTask task() {
651             return task;
652         }
653 
654         @Override
655         public boolean cancel() {
656             // only update the state it will be removed from HashedWheelBucket on next tick.
657             if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
658                 return false;
659             }
660             // If a task should be canceled we put this to another queue which will be processed on each tick.
661             // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
662             // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
663             timer.cancelledTimeouts.add(this);
664             return true;
665         }
666 
667         private void remove() {
668             HashedWheelBucket bucket = this.bucket;
669             if (bucket != null) {
670                 bucket.remove(this);
671             }
672             timer.pendingTimeouts.decrementAndGet();
673         }
674         void removeAfterCancellation() {
675             remove();
676             task.cancelled(this);
677         }
678 
679         public boolean compareAndSetState(int expected, int state) {
680             return STATE_UPDATER.compareAndSet(this, expected, state);
681         }
682 
683         public int state() {
684             return state;
685         }
686 
687         @Override
688         public boolean isCancelled() {
689             return state() == ST_CANCELLED;
690         }
691 
692         @Override
693         public boolean isExpired() {
694             return state() == ST_EXPIRED;
695         }
696 
697         public void expire() {
698             if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
699                 return;
700             }
701 
702             try {
703                 remove();
704                 timer.taskExecutor.execute(this);
705             } catch (Throwable t) {
706                 if (logger.isWarnEnabled()) {
707                     logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
708                             + " for execution.", t);
709                 }
710             }
711         }
712 
713         @Override
714         public void run() {
715             try {
716                 task.run(this);
717             } catch (Throwable t) {
718                 if (logger.isWarnEnabled()) {
719                     logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
720                 }
721             }
722         }
723 
724         @Override
725         public String toString() {
726             final long currentTime = System.nanoTime();
727             long remaining = deadline - currentTime + timer.startTime;
728 
729             StringBuilder buf = new StringBuilder(192)
730                .append(simpleClassName(this))
731                .append('(')
732                .append("deadline: ");
733             if (remaining > 0) {
734                 buf.append(remaining)
735                    .append(" ns later");
736             } else if (remaining < 0) {
737                 buf.append(-remaining)
738                    .append(" ns ago");
739             } else {
740                 buf.append("now");
741             }
742 
743             if (isCancelled()) {
744                 buf.append(", cancelled");
745             }
746 
747             return buf.append(", task: ")
748                       .append(task())
749                       .append(')')
750                       .toString();
751         }
752     }
753 
754     /**
755      * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
756      * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
757      * extra object creation is needed.
758      */
759     private static final class HashedWheelBucket {
760         // Used for the linked-list datastructure
761         private HashedWheelTimeout head;
762         private HashedWheelTimeout tail;
763 
764         /**
765          * Add {@link HashedWheelTimeout} to this bucket.
766          */
767         public void addTimeout(HashedWheelTimeout timeout) {
768             assert timeout.bucket == null;
769             timeout.bucket = this;
770             if (head == null) {
771                 head = tail = timeout;
772             } else {
773                 tail.next = timeout;
774                 timeout.prev = tail;
775                 tail = timeout;
776             }
777         }
778 
779         /**
780          * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
781          */
782         public void expireTimeouts(long deadline) {
783             HashedWheelTimeout timeout = head;
784 
785             // process all timeouts
786             while (timeout != null) {
787                 HashedWheelTimeout next = timeout.next;
788                 if (timeout.remainingRounds <= 0) {
789                     if (timeout.deadline <= deadline) {
790                         timeout.expire();
791                     } else {
792                         // The timeout was placed into a wrong slot. This should never happen.
793                         throw new IllegalStateException(String.format(
794                                 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
795                     }
796                 } else if (!timeout.isCancelled()) {
797                     timeout.remainingRounds --;
798                 }
799                 timeout = next;
800             }
801         }
802 
803         public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
804             HashedWheelTimeout next = timeout.next;
805             // remove timeout that was either processed or cancelled by updating the linked-list
806             if (timeout.prev != null) {
807                 timeout.prev.next = next;
808             }
809             if (timeout.next != null) {
810                 timeout.next.prev = timeout.prev;
811             }
812 
813             if (timeout == head) {
814                 // if timeout is also the tail we need to adjust the entry too
815                 if (timeout == tail) {
816                     tail = null;
817                     head = null;
818                 } else {
819                     head = next;
820                 }
821             } else if (timeout == tail) {
822                 // if the timeout is the tail modify the tail to be the prev node.
823                 tail = timeout.prev;
824             }
825             // null out prev, next and bucket to allow for GC.
826             timeout.prev = null;
827             timeout.next = null;
828             timeout.bucket = null;
829             return next;
830         }
831 
832         /**
833          * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
834          */
835         public void clearTimeouts(Set<Timeout> set) {
836             for (;;) {
837                 HashedWheelTimeout timeout = pollTimeout();
838                 if (timeout == null) {
839                     return;
840                 }
841                 if (timeout.isExpired() || timeout.isCancelled()) {
842                     continue;
843                 }
844                 set.add(timeout);
845             }
846         }
847 
848         private HashedWheelTimeout pollTimeout() {
849             HashedWheelTimeout head = this.head;
850             if (head == null) {
851                 return null;
852             }
853             HashedWheelTimeout next = head.next;
854             if (next == null) {
855                 tail = this.head =  null;
856             } else {
857                 this.head = next;
858                 next.prev = null;
859             }
860 
861             // null out prev and next to allow for GC.
862             head.next = null;
863             head.prev = null;
864             head.bucket = null;
865             return head;
866         }
867     }
868 }