1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty5.util;
17
18 import io.netty5.util.concurrent.ImmediateExecutor;
19 import io.netty5.util.internal.PlatformDependent;
20 import io.netty5.util.internal.logging.InternalLogger;
21 import io.netty5.util.internal.logging.InternalLoggerFactory;
22
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.Queue;
26 import java.util.Set;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
36 import java.util.concurrent.atomic.AtomicLong;
37
38 import static io.netty5.util.internal.ObjectUtil.checkInRange;
39 import static io.netty5.util.internal.ObjectUtil.checkPositive;
40 import static io.netty5.util.internal.StringUtil.simpleClassName;
41 import static java.util.Objects.requireNonNull;
42
43 /**
44 * A {@link Timer} optimized for approximated I/O timeout scheduling.
45 *
46 * <h3>Tick Duration</h3>
47 *
48 * As described with 'approximated', this timer does not execute the scheduled
49 * {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will
50 * check if there are any {@link TimerTask}s behind the schedule and execute
51 * them.
52 * <p>
53 * You can increase or decrease the accuracy of the execution timing by
54 * specifying smaller or larger tick duration in the constructor. In most
55 * network applications, I/O timeout does not need to be accurate. Therefore,
56 * the default tick duration is 100 milliseconds and you will not need to try
57 * different configurations in most cases.
58 *
59 * <h3>Ticks per Wheel (Wheel Size)</h3>
60 *
61 * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
62 * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
63 * function is 'dead line of the task'. The default number of ticks per wheel
64 * (i.e. the size of the wheel) is 512. You could specify a larger value
65 * if you are going to schedule a lot of timeouts.
66 *
67 * <h3>Do not create many instances.</h3>
68 *
69 * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
70 * started. Therefore, you should make sure to create only one instance and
71 * share it across your application. One of the common mistakes, that makes
72 * your application unresponsive, is to create a new instance for every connection.
73 *
74 * <h3>Implementation Details</h3>
75 *
76 * {@link HashedWheelTimer} is based on
77 * <a href="https://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
78 * Tony Lauck's paper,
79 * <a href="https://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
80 * and Hierarchical Timing Wheels: data structures to efficiently implement a
81 * timer facility'</a>. More comprehensive slides are located
82 * <a href="https://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
83 */
84 public class HashedWheelTimer implements Timer {
85
86 static final InternalLogger logger =
87 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
88
89 private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
90 private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
91 private static final int INSTANCE_COUNT_LIMIT = 64;
92 private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
93 private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
94 .newResourceLeakDetector(HashedWheelTimer.class, 1);
95
96 private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
97 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
98
99 private final ResourceLeakTracker<HashedWheelTimer> leak;
100 private final Worker worker = new Worker();
101 private final Thread workerThread;
102
103 public static final int WORKER_STATE_INIT = 0;
104 public static final int WORKER_STATE_STARTED = 1;
105 public static final int WORKER_STATE_SHUTDOWN = 2;
106 @SuppressWarnings({"unused", "FieldMayBeFinal"})
107 private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
108
109 private final long tickDuration;
110 private final HashedWheelBucket[] wheel;
111 private final int mask;
112 private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
113 private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
114 private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
115 private final AtomicLong pendingTimeouts = new AtomicLong(0);
116 private final long maxPendingTimeouts;
117 private final Executor taskExecutor;
118
119 private volatile long startTime;
120
121 /**
122 * Creates a new timer with the default thread factory
123 * ({@link Executors#defaultThreadFactory()}), default tick duration, and
124 * default number of ticks per wheel.
125 */
126 public HashedWheelTimer() {
127 this(Executors.defaultThreadFactory());
128 }
129
130 /**
131 * Creates a new timer with the default thread factory
132 * ({@link Executors#defaultThreadFactory()}) and default number of ticks
133 * per wheel.
134 *
135 * @param tickDuration the duration between tick
136 * @param unit the time unit of the {@code tickDuration}
137 * @throws NullPointerException if {@code unit} is {@code null}
138 * @throws IllegalArgumentException if {@code tickDuration} is <= 0
139 */
140 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
141 this(Executors.defaultThreadFactory(), tickDuration, unit);
142 }
143
144 /**
145 * Creates a new timer with the default thread factory
146 * ({@link Executors#defaultThreadFactory()}).
147 *
148 * @param tickDuration the duration between tick
149 * @param unit the time unit of the {@code tickDuration}
150 * @param ticksPerWheel the size of the wheel
151 * @throws NullPointerException if {@code unit} is {@code null}
152 * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
153 */
154 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
155 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
156 }
157
158 /**
159 * Creates a new timer with the default tick duration and default number of
160 * ticks per wheel.
161 *
162 * @param threadFactory a {@link ThreadFactory} that creates a
163 * background {@link Thread} which is dedicated to
164 * {@link TimerTask} execution.
165 * @throws NullPointerException if {@code threadFactory} is {@code null}
166 */
167 public HashedWheelTimer(ThreadFactory threadFactory) {
168 this(threadFactory, 100, TimeUnit.MILLISECONDS);
169 }
170
171 /**
172 * Creates a new timer with the default number of ticks per wheel.
173 *
174 * @param threadFactory a {@link ThreadFactory} that creates a
175 * background {@link Thread} which is dedicated to
176 * {@link TimerTask} execution.
177 * @param tickDuration the duration between tick
178 * @param unit the time unit of the {@code tickDuration}
179 * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
180 * @throws IllegalArgumentException if {@code tickDuration} is <= 0
181 */
182 public HashedWheelTimer(
183 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
184 this(threadFactory, tickDuration, unit, 512);
185 }
186
187 /**
188 * Creates a new timer.
189 *
190 * @param threadFactory a {@link ThreadFactory} that creates a
191 * background {@link Thread} which is dedicated to
192 * {@link TimerTask} execution.
193 * @param tickDuration the duration between tick
194 * @param unit the time unit of the {@code tickDuration}
195 * @param ticksPerWheel the size of the wheel
196 * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
197 * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
198 */
199 public HashedWheelTimer(
200 ThreadFactory threadFactory,
201 long tickDuration, TimeUnit unit, int ticksPerWheel) {
202 this(threadFactory, tickDuration, unit, ticksPerWheel, true);
203 }
204
205 /**
206 * Creates a new timer.
207 *
208 * @param threadFactory a {@link ThreadFactory} that creates a
209 * background {@link Thread} which is dedicated to
210 * {@link TimerTask} execution.
211 * @param tickDuration the duration between tick
212 * @param unit the time unit of the {@code tickDuration}
213 * @param ticksPerWheel the size of the wheel
214 * @param leakDetection {@code true} if leak detection should be enabled always,
215 * if false it will only be enabled if the worker thread is not
216 * a daemon thread.
217 * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
218 * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
219 */
220 public HashedWheelTimer(
221 ThreadFactory threadFactory,
222 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
223 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
224 }
225
226 /**
227 * Creates a new timer.
228 *
229 * @param threadFactory a {@link ThreadFactory} that creates a
230 * background {@link Thread} which is dedicated to
231 * {@link TimerTask} execution.
232 * @param tickDuration the duration between tick
233 * @param unit the time unit of the {@code tickDuration}
234 * @param ticksPerWheel the size of the wheel
235 * @param leakDetection {@code true} if leak detection should be enabled always,
236 * if false it will only be enabled if the worker thread is not
237 * a daemon thread.
238 * @param maxPendingTimeouts The maximum number of pending timeouts after which call to
239 * {@code newTimeout} will result in
240 * {@link java.util.concurrent.RejectedExecutionException}
241 * being thrown. No maximum pending timeouts limit is assumed if
242 * this value is 0 or negative.
243 * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
244 * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
245 */
246 public HashedWheelTimer(
247 ThreadFactory threadFactory,
248 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
249 long maxPendingTimeouts) {
250 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
251 maxPendingTimeouts, ImmediateExecutor.INSTANCE);
252 }
253 /**
254 * Creates a new timer.
255 *
256 * @param threadFactory a {@link ThreadFactory} that creates a
257 * background {@link Thread} which is dedicated to
258 * {@link TimerTask} execution.
259 * @param tickDuration the duration between tick
260 * @param unit the time unit of the {@code tickDuration}
261 * @param ticksPerWheel the size of the wheel
262 * @param leakDetection {@code true} if leak detection should be enabled always,
263 * if false it will only be enabled if the worker thread is not
264 * a daemon thread.
265 * @param maxPendingTimeouts The maximum number of pending timeouts after which call to
266 * {@code newTimeout} will result in
267 * {@link java.util.concurrent.RejectedExecutionException}
268 * being thrown. No maximum pending timeouts limit is assumed if
269 * this value is 0 or negative.
270 * @param taskExecutor The {@link Executor} that is used to execute the submitted {@link TimerTask}s.
271 * The caller is responsible to shutdown the {@link Executor} once it is not needed
272 * anymore.
273 * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
274 * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
275 */
276 public HashedWheelTimer(
277 ThreadFactory threadFactory,
278 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
279 long maxPendingTimeouts, Executor taskExecutor) {
280
281 requireNonNull(threadFactory, "threadFactory");
282 requireNonNull(unit, "unit");
283 checkPositive(tickDuration, "tickDuration");
284 checkPositive(ticksPerWheel, "ticksPerWheel");
285 this.taskExecutor = requireNonNull(taskExecutor, "taskExecutor");
286
287 // Normalize ticksPerWheel to power of two and initialize the wheel.
288 wheel = createWheel(ticksPerWheel);
289 mask = wheel.length - 1;
290
291 // Convert tickDuration to nanos.
292 long duration = unit.toNanos(tickDuration);
293
294 // Prevent overflow.
295 if (duration >= Long.MAX_VALUE / wheel.length) {
296 throw new IllegalArgumentException(String.format(
297 "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
298 tickDuration, Long.MAX_VALUE / wheel.length));
299 }
300
301 if (duration < MILLISECOND_NANOS) {
302 logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
303 tickDuration, MILLISECOND_NANOS);
304 this.tickDuration = MILLISECOND_NANOS;
305 } else {
306 this.tickDuration = duration;
307 }
308
309 workerThread = threadFactory.newThread(worker);
310
311 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
312
313 this.maxPendingTimeouts = maxPendingTimeouts;
314
315 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
316 WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
317 reportTooManyInstances();
318 }
319 }
320
321 @Override
322 protected void finalize() throws Throwable {
323 try {
324 super.finalize();
325 } finally {
326 // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
327 // we have not yet shutdown then we want to make sure we decrement the active instance count.
328 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
329 INSTANCE_COUNTER.decrementAndGet();
330 }
331 }
332 }
333
334 private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
335 //ticksPerWheel may not be greater than 2^30
336 checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
337
338 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
339 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
340 for (int i = 0; i < wheel.length; i ++) {
341 wheel[i] = new HashedWheelBucket();
342 }
343 return wheel;
344 }
345
346 private static int normalizeTicksPerWheel(int ticksPerWheel) {
347 int normalizedTicksPerWheel = 1;
348 while (normalizedTicksPerWheel < ticksPerWheel) {
349 normalizedTicksPerWheel <<= 1;
350 }
351 return normalizedTicksPerWheel;
352 }
353
354 /**
355 * Starts the background thread explicitly. The background thread will
356 * start automatically on demand even if you did not call this method.
357 *
358 * @throws IllegalStateException if this timer has been
359 * {@linkplain #stop() stopped} already
360 */
361 public void start() {
362 switch (WORKER_STATE_UPDATER.get(this)) {
363 case WORKER_STATE_INIT:
364 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
365 workerThread.start();
366 }
367 break;
368 case WORKER_STATE_STARTED:
369 break;
370 case WORKER_STATE_SHUTDOWN:
371 throw new IllegalStateException("cannot be started once stopped");
372 default:
373 throw new Error("Invalid WorkerState");
374 }
375
376 // Wait until the startTime is initialized by the worker.
377 while (startTime == 0) {
378 try {
379 startTimeInitialized.await();
380 } catch (InterruptedException ignore) {
381 // Ignore - it will be ready very soon.
382 }
383 }
384 }
385
386 @Override
387 public Set<Timeout> stop() {
388 if (Thread.currentThread() == workerThread) {
389 throw new IllegalStateException(
390 HashedWheelTimer.class.getSimpleName() +
391 ".stop() cannot be called from " +
392 TimerTask.class.getSimpleName());
393 }
394
395 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
396 // workerState can be 0 or 2 at this moment - let it always be 2.
397 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
398 INSTANCE_COUNTER.decrementAndGet();
399 if (leak != null) {
400 boolean closed = leak.close(this);
401 assert closed;
402 }
403 }
404
405 return Collections.emptySet();
406 }
407
408 try {
409 boolean interrupted = false;
410 while (workerThread.isAlive()) {
411 workerThread.interrupt();
412 try {
413 workerThread.join(100);
414 } catch (InterruptedException ignored) {
415 interrupted = true;
416 }
417 }
418
419 if (interrupted) {
420 Thread.currentThread().interrupt();
421 }
422 } finally {
423 INSTANCE_COUNTER.decrementAndGet();
424 if (leak != null) {
425 boolean closed = leak.close(this);
426 assert closed;
427 }
428 }
429 return worker.unprocessedTimeouts();
430 }
431
432 @Override
433 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
434 requireNonNull(task, "task");
435 requireNonNull(unit, "unit");
436
437 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
438
439 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
440 pendingTimeouts.decrementAndGet();
441 throw new RejectedExecutionException("Number of pending timeouts ("
442 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
443 + "timeouts (" + maxPendingTimeouts + ")");
444 }
445
446 start();
447
448 // Add the timeout to the timeout queue which will be processed on the next tick.
449 // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
450 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
451
452 // Guard against overflow.
453 if (delay > 0 && deadline < 0) {
454 deadline = Long.MAX_VALUE;
455 }
456 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
457 timeouts.add(timeout);
458 return timeout;
459 }
460
461 /**
462 * Returns the number of pending timeouts of this {@link Timer}.
463 */
464 public long pendingTimeouts() {
465 return pendingTimeouts.get();
466 }
467
468 private static void reportTooManyInstances() {
469 if (logger.isErrorEnabled()) {
470 String resourceType = simpleClassName(HashedWheelTimer.class);
471 logger.error("You are creating too many " + resourceType + " instances. " +
472 resourceType + " is a shared resource that must be reused across the JVM, " +
473 "so that only a few instances are created.");
474 }
475 }
476
477 private final class Worker implements Runnable {
478 private final Set<Timeout> unprocessedTimeouts = new HashSet<>();
479
480 private long tick;
481
482 @Override
483 public void run() {
484 // Initialize the startTime.
485 startTime = System.nanoTime();
486 if (startTime == 0) {
487 // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
488 startTime = 1;
489 }
490
491 // Notify the other threads waiting for the initialization at start().
492 startTimeInitialized.countDown();
493
494 do {
495 final long deadline = waitForNextTick();
496 if (deadline > 0) {
497 int idx = (int) (tick & mask);
498 processCancelledTasks();
499 HashedWheelBucket bucket =
500 wheel[idx];
501 transferTimeoutsToBuckets();
502 bucket.expireTimeouts(deadline);
503 tick++;
504 }
505 } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
506
507 // Fill the unprocessedTimeouts so we can return them from stop() method.
508 for (HashedWheelBucket bucket: wheel) {
509 bucket.clearTimeouts(unprocessedTimeouts);
510 }
511 for (;;) {
512 HashedWheelTimeout timeout = timeouts.poll();
513 if (timeout == null) {
514 break;
515 }
516 if (!timeout.isCancelled()) {
517 unprocessedTimeouts.add(timeout);
518 }
519 }
520 processCancelledTasks();
521 }
522
523 private void transferTimeoutsToBuckets() {
524 // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
525 // adds new timeouts in a loop.
526 for (int i = 0; i < 100000; i++) {
527 HashedWheelTimeout timeout = timeouts.poll();
528 if (timeout == null) {
529 // all processed
530 break;
531 }
532 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
533 // Was cancelled in the meantime.
534 continue;
535 }
536
537 long calculated = timeout.deadline / tickDuration;
538 timeout.remainingRounds = (calculated - tick) / wheel.length;
539
540 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
541 int stopIndex = (int) (ticks & mask);
542
543 HashedWheelBucket bucket = wheel[stopIndex];
544 bucket.addTimeout(timeout);
545 }
546 }
547
548 private void processCancelledTasks() {
549 for (;;) {
550 HashedWheelTimeout timeout = cancelledTimeouts.poll();
551 if (timeout == null) {
552 // all processed
553 break;
554 }
555 try {
556 timeout.remove();
557 } catch (Throwable t) {
558 if (logger.isWarnEnabled()) {
559 logger.warn("An exception was thrown while process a cancellation task", t);
560 }
561 }
562 }
563 }
564
565 /**
566 * calculate goal nanoTime from startTime and current tick number,
567 * then wait until that goal has been reached.
568 * @return Long.MIN_VALUE if received a shutdown request,
569 * current time otherwise (with Long.MIN_VALUE changed by +1)
570 */
571 private long waitForNextTick() {
572 long deadline = tickDuration * (tick + 1);
573
574 for (;;) {
575 final long currentTime = System.nanoTime() - startTime;
576 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
577
578 if (sleepTimeMs <= 0) {
579 if (currentTime == Long.MIN_VALUE) {
580 return -Long.MAX_VALUE;
581 } else {
582 return currentTime;
583 }
584 }
585
586 // Check if we run on windows, as if thats the case we will need
587 // to round the sleepTime as workaround for a bug that only affect
588 // the JVM if it runs on windows.
589 //
590 // See https://github.com/netty/netty/issues/356
591 if (PlatformDependent.isWindows()) {
592 sleepTimeMs = sleepTimeMs / 10 * 10;
593 if (sleepTimeMs == 0) {
594 sleepTimeMs = 1;
595 }
596 }
597
598 try {
599 Thread.sleep(sleepTimeMs);
600 } catch (InterruptedException ignored) {
601 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
602 return Long.MIN_VALUE;
603 }
604 }
605 }
606 }
607
608 public Set<Timeout> unprocessedTimeouts() {
609 return Collections.unmodifiableSet(unprocessedTimeouts);
610 }
611 }
612
613 private static final class HashedWheelTimeout implements Timeout, Runnable {
614
615 private static final int ST_INIT = 0;
616 private static final int ST_CANCELLED = 1;
617 private static final int ST_EXPIRED = 2;
618 private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
619 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
620
621 private final HashedWheelTimer timer;
622 private final TimerTask task;
623 private final long deadline;
624
625 @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
626 private volatile int state = ST_INIT;
627
628 // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
629 // HashedWheelTimeout will be added to the correct HashedWheelBucket.
630 long remainingRounds;
631
632 // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
633 // As only the workerThread will act on it there is no need for synchronization / volatile.
634 HashedWheelTimeout next;
635 HashedWheelTimeout prev;
636
637 // The bucket to which the timeout was added
638 HashedWheelBucket bucket;
639
640 HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
641 this.timer = timer;
642 this.task = task;
643 this.deadline = deadline;
644 }
645
646 @Override
647 public Timer timer() {
648 return timer;
649 }
650
651 @Override
652 public TimerTask task() {
653 return task;
654 }
655
656 @Override
657 public boolean cancel() {
658 // only update the state it will be removed from HashedWheelBucket on next tick.
659 if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
660 return false;
661 }
662 // If a task should be canceled we put this to another queue which will be processed on each tick.
663 // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
664 // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
665 timer.cancelledTimeouts.add(this);
666 return true;
667 }
668
669 void remove() {
670 HashedWheelBucket bucket = this.bucket;
671 if (bucket != null) {
672 bucket.remove(this);
673 } else {
674 timer.pendingTimeouts.decrementAndGet();
675 }
676 }
677
678 public boolean compareAndSetState(int expected, int state) {
679 return STATE_UPDATER.compareAndSet(this, expected, state);
680 }
681
682 public int state() {
683 return state;
684 }
685
686 @Override
687 public boolean isCancelled() {
688 return state() == ST_CANCELLED;
689 }
690
691 @Override
692 public boolean isExpired() {
693 return state() == ST_EXPIRED;
694 }
695
696 public void expire() {
697 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
698 return;
699 }
700
701 try {
702 timer.taskExecutor.execute(this);
703 } catch (Throwable t) {
704 if (logger.isWarnEnabled()) {
705 logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
706 + " for execution.", t);
707 }
708 }
709 }
710
711 @Override
712 public void run() {
713 try {
714 task.run(this);
715 } catch (Throwable t) {
716 if (logger.isWarnEnabled()) {
717 logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
718 }
719 }
720 }
721
722 @Override
723 public String toString() {
724 final long currentTime = System.nanoTime();
725 long remaining = deadline - currentTime + timer.startTime;
726
727 StringBuilder buf = new StringBuilder(192)
728 .append(simpleClassName(this))
729 .append('(')
730 .append("deadline: ");
731 if (remaining > 0) {
732 buf.append(remaining)
733 .append(" ns later");
734 } else if (remaining < 0) {
735 buf.append(-remaining)
736 .append(" ns ago");
737 } else {
738 buf.append("now");
739 }
740
741 if (isCancelled()) {
742 buf.append(", cancelled");
743 }
744
745 return buf.append(", task: ")
746 .append(task())
747 .append(')')
748 .toString();
749 }
750 }
751
752 /**
753 * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
754 * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
755 * extra object creation is needed.
756 */
757 private static final class HashedWheelBucket {
758 // Used for the linked-list datastructure
759 private HashedWheelTimeout head;
760 private HashedWheelTimeout tail;
761
762 /**
763 * Add {@link HashedWheelTimeout} to this bucket.
764 */
765 public void addTimeout(HashedWheelTimeout timeout) {
766 assert timeout.bucket == null;
767 timeout.bucket = this;
768 if (head == null) {
769 head = tail = timeout;
770 } else {
771 tail.next = timeout;
772 timeout.prev = tail;
773 tail = timeout;
774 }
775 }
776
777 /**
778 * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
779 */
780 public void expireTimeouts(long deadline) {
781 HashedWheelTimeout timeout = head;
782
783 // process all timeouts
784 while (timeout != null) {
785 HashedWheelTimeout next = timeout.next;
786 if (timeout.remainingRounds <= 0) {
787 next = remove(timeout);
788 if (timeout.deadline <= deadline) {
789 timeout.expire();
790 } else {
791 // The timeout was placed into a wrong slot. This should never happen.
792 throw new IllegalStateException(String.format(
793 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
794 }
795 } else if (timeout.isCancelled()) {
796 next = remove(timeout);
797 } else {
798 timeout.remainingRounds --;
799 }
800 timeout = next;
801 }
802 }
803
804 public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
805 HashedWheelTimeout next = timeout.next;
806 // remove timeout that was either processed or cancelled by updating the linked-list
807 if (timeout.prev != null) {
808 timeout.prev.next = next;
809 }
810 if (timeout.next != null) {
811 timeout.next.prev = timeout.prev;
812 }
813
814 if (timeout == head) {
815 // if timeout is also the tail we need to adjust the entry too
816 if (timeout == tail) {
817 tail = null;
818 head = null;
819 } else {
820 head = next;
821 }
822 } else if (timeout == tail) {
823 // if the timeout is the tail modify the tail to be the prev node.
824 tail = timeout.prev;
825 }
826 // null out prev, next and bucket to allow for GC.
827 timeout.prev = null;
828 timeout.next = null;
829 timeout.bucket = null;
830 timeout.timer.pendingTimeouts.decrementAndGet();
831 return next;
832 }
833
834 /**
835 * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
836 */
837 public void clearTimeouts(Set<Timeout> set) {
838 for (;;) {
839 HashedWheelTimeout timeout = pollTimeout();
840 if (timeout == null) {
841 return;
842 }
843 if (timeout.isExpired() || timeout.isCancelled()) {
844 continue;
845 }
846 set.add(timeout);
847 }
848 }
849
850 private HashedWheelTimeout pollTimeout() {
851 HashedWheelTimeout head = this.head;
852 if (head == null) {
853 return null;
854 }
855 HashedWheelTimeout next = head.next;
856 if (next == null) {
857 tail = this.head = null;
858 } else {
859 this.head = next;
860 next.prev = null;
861 }
862
863 // null out prev and next to allow for GC.
864 head.next = null;
865 head.prev = null;
866 head.bucket = null;
867 return head;
868 }
869 }
870 }