View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util;
17  
18  import io.netty.util.internal.MpscLinkedQueueNode;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.StringUtil;
21  import io.netty.util.internal.logging.InternalLogger;
22  import io.netty.util.internal.logging.InternalLoggerFactory;
23  
24  import java.util.Collections;
25  import java.util.HashSet;
26  import java.util.Queue;
27  import java.util.Set;
28  import java.util.concurrent.CountDownLatch;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.ThreadFactory;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
33  
34  /**
35   * A {@link Timer} optimized for approximated I/O timeout scheduling.
36   *
37   * <h3>Tick Duration</h3>
38   *
39   * As described with 'approximated', this timer does not execute the scheduled
40   * {@link TimerTask} on time.  {@link HashedWheelTimer}, on every tick, will
41   * check if there are any {@link TimerTask}s behind the schedule and execute
42   * them.
43   * <p>
44   * You can increase or decrease the accuracy of the execution timing by
45   * specifying smaller or larger tick duration in the constructor.  In most
46   * network applications, I/O timeout does not need to be accurate.  Therefore,
47   * the default tick duration is 100 milliseconds and you will not need to try
48   * different configurations in most cases.
49   *
50   * <h3>Ticks per Wheel (Wheel Size)</h3>
51   *
52   * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
53   * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
54   * function is 'dead line of the task'.  The default number of ticks per wheel
55   * (i.e. the size of the wheel) is 512.  You could specify a larger value
56   * if you are going to schedule a lot of timeouts.
57   *
58   * <h3>Do not create many instances.</h3>
59   *
60   * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
61   * started.  Therefore, you should make sure to create only one instance and
62   * share it across your application.  One of the common mistakes, that makes
63   * your application unresponsive, is to create a new instance for every connection.
64   *
65   * <h3>Implementation Details</h3>
66   *
67   * {@link HashedWheelTimer} is based on
68   * <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
69   * Tony Lauck's paper,
70   * <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
71   * and Hierarchical Timing Wheels: data structures to efficiently implement a
72   * timer facility'</a>.  More comprehensive slides are located
73   * <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
74   */
75  public class HashedWheelTimer implements Timer {
76  
77      static final InternalLogger logger =
78              InternalLoggerFactory.getInstance(HashedWheelTimer.class);
79  
80      private static final ResourceLeakDetector<HashedWheelTimer> leakDetector =
81              new ResourceLeakDetector<HashedWheelTimer>(
82                      HashedWheelTimer.class, 1, Runtime.getRuntime().availableProcessors() * 4);
83  
84      private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER;
85      static {
86          AtomicIntegerFieldUpdater<HashedWheelTimer> workerStateUpdater =
87                  PlatformDependent.newAtomicIntegerFieldUpdater(HashedWheelTimer.class, "workerState");
88          if (workerStateUpdater == null) {
89              workerStateUpdater = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
90          }
91          WORKER_STATE_UPDATER = workerStateUpdater;
92      }
93  
94      private final ResourceLeak leak;
95      private final Worker worker = new Worker();
96      private final Thread workerThread;
97  
98      public static final int WORKER_STATE_INIT = 0;
99      public static final int WORKER_STATE_STARTED = 1;
100     public static final int WORKER_STATE_SHUTDOWN = 2;
101     @SuppressWarnings({ "unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
102     private volatile int workerState = WORKER_STATE_INIT; // 0 - init, 1 - started, 2 - shut down
103 
104     private final long tickDuration;
105     private final HashedWheelBucket[] wheel;
106     private final int mask;
107     private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
108     private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
109     private final Queue<Runnable> cancelledTimeouts = PlatformDependent.newMpscQueue();
110 
111     private volatile long startTime;
112 
113     /**
114      * Creates a new timer with the default thread factory
115      * ({@link Executors#defaultThreadFactory()}), default tick duration, and
116      * default number of ticks per wheel.
117      */
118     public HashedWheelTimer() {
119         this(Executors.defaultThreadFactory());
120     }
121 
122     /**
123      * Creates a new timer with the default thread factory
124      * ({@link Executors#defaultThreadFactory()}) and default number of ticks
125      * per wheel.
126      *
127      * @param tickDuration   the duration between tick
128      * @param unit           the time unit of the {@code tickDuration}
129      * @throws NullPointerException     if {@code unit} is {@code null}
130      * @throws IllegalArgumentException if {@code tickDuration} is <= 0
131      */
132     public HashedWheelTimer(long tickDuration, TimeUnit unit) {
133         this(Executors.defaultThreadFactory(), tickDuration, unit);
134     }
135 
136     /**
137      * Creates a new timer with the default thread factory
138      * ({@link Executors#defaultThreadFactory()}).
139      *
140      * @param tickDuration   the duration between tick
141      * @param unit           the time unit of the {@code tickDuration}
142      * @param ticksPerWheel  the size of the wheel
143      * @throws NullPointerException     if {@code unit} is {@code null}
144      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
145      */
146     public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
147         this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
148     }
149 
150     /**
151      * Creates a new timer with the default tick duration and default number of
152      * ticks per wheel.
153      *
154      * @param threadFactory  a {@link ThreadFactory} that creates a
155      *                       background {@link Thread} which is dedicated to
156      *                       {@link TimerTask} execution.
157      * @throws NullPointerException if {@code threadFactory} is {@code null}
158      */
159     public HashedWheelTimer(ThreadFactory threadFactory) {
160         this(threadFactory, 100, TimeUnit.MILLISECONDS);
161     }
162 
163     /**
164      * Creates a new timer with the default number of ticks per wheel.
165      *
166      * @param threadFactory  a {@link ThreadFactory} that creates a
167      *                       background {@link Thread} which is dedicated to
168      *                       {@link TimerTask} execution.
169      * @param tickDuration   the duration between tick
170      * @param unit           the time unit of the {@code tickDuration}
171      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
172      * @throws IllegalArgumentException if {@code tickDuration} is <= 0
173      */
174     public HashedWheelTimer(
175             ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
176         this(threadFactory, tickDuration, unit, 512);
177     }
178 
179     /**
180      * Creates a new timer.
181      *
182      * @param threadFactory  a {@link ThreadFactory} that creates a
183      *                       background {@link Thread} which is dedicated to
184      *                       {@link TimerTask} execution.
185      * @param tickDuration   the duration between tick
186      * @param unit           the time unit of the {@code tickDuration}
187      * @param ticksPerWheel  the size of the wheel
188      * @throws NullPointerException     if either of {@code threadFactory} and {@code unit} is {@code null}
189      * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
190      */
191     public HashedWheelTimer(
192             ThreadFactory threadFactory,
193             long tickDuration, TimeUnit unit, int ticksPerWheel) {
194 
195         if (threadFactory == null) {
196             throw new NullPointerException("threadFactory");
197         }
198         if (unit == null) {
199             throw new NullPointerException("unit");
200         }
201         if (tickDuration <= 0) {
202             throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
203         }
204         if (ticksPerWheel <= 0) {
205             throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
206         }
207 
208         // Normalize ticksPerWheel to power of two and initialize the wheel.
209         wheel = createWheel(ticksPerWheel);
210         mask = wheel.length - 1;
211 
212         // Convert tickDuration to nanos.
213         this.tickDuration = unit.toNanos(tickDuration);
214 
215         // Prevent overflow.
216         if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
217             throw new IllegalArgumentException(String.format(
218                     "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
219                     tickDuration, Long.MAX_VALUE / wheel.length));
220         }
221         workerThread = threadFactory.newThread(worker);
222 
223         leak = leakDetector.open(this);
224     }
225 
226     private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
227         if (ticksPerWheel <= 0) {
228             throw new IllegalArgumentException(
229                     "ticksPerWheel must be greater than 0: " + ticksPerWheel);
230         }
231         if (ticksPerWheel > 1073741824) {
232             throw new IllegalArgumentException(
233                     "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
234         }
235 
236         ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
237         HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
238         for (int i = 0; i < wheel.length; i ++) {
239             wheel[i] = new HashedWheelBucket();
240         }
241         return wheel;
242     }
243 
244     private static int normalizeTicksPerWheel(int ticksPerWheel) {
245         int normalizedTicksPerWheel = 1;
246         while (normalizedTicksPerWheel < ticksPerWheel) {
247             normalizedTicksPerWheel <<= 1;
248         }
249         return normalizedTicksPerWheel;
250     }
251 
252     /**
253      * Starts the background thread explicitly.  The background thread will
254      * start automatically on demand even if you did not call this method.
255      *
256      * @throws IllegalStateException if this timer has been
257      *                               {@linkplain #stop() stopped} already
258      */
259     public void start() {
260         switch (WORKER_STATE_UPDATER.get(this)) {
261             case WORKER_STATE_INIT:
262                 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
263                     workerThread.start();
264                 }
265                 break;
266             case WORKER_STATE_STARTED:
267                 break;
268             case WORKER_STATE_SHUTDOWN:
269                 throw new IllegalStateException("cannot be started once stopped");
270             default:
271                 throw new Error("Invalid WorkerState");
272         }
273 
274         // Wait until the startTime is initialized by the worker.
275         while (startTime == 0) {
276             try {
277                 startTimeInitialized.await();
278             } catch (InterruptedException ignore) {
279                 // Ignore - it will be ready very soon.
280             }
281         }
282     }
283 
284     @Override
285     public Set<Timeout> stop() {
286         if (Thread.currentThread() == workerThread) {
287             throw new IllegalStateException(
288                     HashedWheelTimer.class.getSimpleName() +
289                             ".stop() cannot be called from " +
290                             TimerTask.class.getSimpleName());
291         }
292 
293         if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
294             // workerState can be 0 or 2 at this moment - let it always be 2.
295             WORKER_STATE_UPDATER.set(this, WORKER_STATE_SHUTDOWN);
296 
297             if (leak != null) {
298                 leak.close();
299             }
300 
301             return Collections.emptySet();
302         }
303 
304         boolean interrupted = false;
305         while (workerThread.isAlive()) {
306             workerThread.interrupt();
307             try {
308                 workerThread.join(100);
309             } catch (InterruptedException ignored) {
310                 interrupted = true;
311             }
312         }
313 
314         if (interrupted) {
315             Thread.currentThread().interrupt();
316         }
317 
318         if (leak != null) {
319             leak.close();
320         }
321         return worker.unprocessedTimeouts();
322     }
323 
324     @Override
325     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
326         if (task == null) {
327             throw new NullPointerException("task");
328         }
329         if (unit == null) {
330             throw new NullPointerException("unit");
331         }
332         start();
333 
334         // Add the timeout to the timeout queue which will be processed on the next tick.
335         // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
336         long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
337         HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
338         timeouts.add(timeout);
339         return timeout;
340     }
341 
342     private final class Worker implements Runnable {
343         private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
344 
345         private long tick;
346 
347         @Override
348         public void run() {
349             // Initialize the startTime.
350             startTime = System.nanoTime();
351             if (startTime == 0) {
352                 // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
353                 startTime = 1;
354             }
355 
356             // Notify the other threads waiting for the initialization at start().
357             startTimeInitialized.countDown();
358 
359             do {
360                 final long deadline = waitForNextTick();
361                 if (deadline > 0) {
362                     int idx = (int) (tick & mask);
363                     processCancelledTasks();
364                     HashedWheelBucket bucket =
365                             wheel[idx];
366                     transferTimeoutsToBuckets();
367                     bucket.expireTimeouts(deadline);
368                     tick++;
369                 }
370             } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
371 
372             // Fill the unprocessedTimeouts so we can return them from stop() method.
373             for (HashedWheelBucket bucket: wheel) {
374                 bucket.clearTimeouts(unprocessedTimeouts);
375             }
376             for (;;) {
377                 HashedWheelTimeout timeout = timeouts.poll();
378                 if (timeout == null) {
379                     break;
380                 }
381                 if (!timeout.isCancelled()) {
382                     unprocessedTimeouts.add(timeout);
383                 }
384             }
385             processCancelledTasks();
386         }
387 
388         private void transferTimeoutsToBuckets() {
389             // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
390             // adds new timeouts in a loop.
391             for (int i = 0; i < 100000; i++) {
392                 HashedWheelTimeout timeout = timeouts.poll();
393                 if (timeout == null) {
394                     // all processed
395                     break;
396                 }
397                 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
398                     // Was cancelled in the meantime.
399                     continue;
400                 }
401 
402                 long calculated = timeout.deadline / tickDuration;
403                 timeout.remainingRounds = (calculated - tick) / wheel.length;
404 
405                 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
406                 int stopIndex = (int) (ticks & mask);
407 
408                 HashedWheelBucket bucket = wheel[stopIndex];
409                 bucket.addTimeout(timeout);
410             }
411         }
412 
413         private void processCancelledTasks() {
414             for (;;) {
415                 Runnable task = cancelledTimeouts.poll();
416                 if (task == null) {
417                     // all processed
418                     break;
419                 }
420                 try {
421                     task.run();
422                 } catch (Throwable t) {
423                     if (logger.isWarnEnabled()) {
424                         logger.warn("An exception was thrown while process a cancellation task", t);
425                     }
426                 }
427             }
428         }
429 
430         /**
431          * calculate goal nanoTime from startTime and current tick number,
432          * then wait until that goal has been reached.
433          * @return Long.MIN_VALUE if received a shutdown request,
434          * current time otherwise (with Long.MIN_VALUE changed by +1)
435          */
436         private long waitForNextTick() {
437             long deadline = tickDuration * (tick + 1);
438 
439             for (;;) {
440                 final long currentTime = System.nanoTime() - startTime;
441                 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
442 
443                 if (sleepTimeMs <= 0) {
444                     if (currentTime == Long.MIN_VALUE) {
445                         return -Long.MAX_VALUE;
446                     } else {
447                         return currentTime;
448                     }
449                 }
450 
451                 // Check if we run on windows, as if thats the case we will need
452                 // to round the sleepTime as workaround for a bug that only affect
453                 // the JVM if it runs on windows.
454                 //
455                 // See https://github.com/netty/netty/issues/356
456                 if (PlatformDependent.isWindows()) {
457                     sleepTimeMs = sleepTimeMs / 10 * 10;
458                 }
459 
460                 try {
461                     Thread.sleep(sleepTimeMs);
462                 } catch (InterruptedException ignored) {
463                     if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
464                         return Long.MIN_VALUE;
465                     }
466                 }
467             }
468         }
469 
470         public Set<Timeout> unprocessedTimeouts() {
471             return Collections.unmodifiableSet(unprocessedTimeouts);
472         }
473     }
474 
475     private static final class HashedWheelTimeout extends MpscLinkedQueueNode<Timeout>
476             implements Timeout {
477 
478         private static final int ST_INIT = 0;
479         private static final int ST_CANCELLED = 1;
480         private static final int ST_EXPIRED = 2;
481         private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER;
482 
483         static {
484             AtomicIntegerFieldUpdater<HashedWheelTimeout> updater =
485                     PlatformDependent.newAtomicIntegerFieldUpdater(HashedWheelTimeout.class, "state");
486             if (updater == null) {
487                 updater = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
488             }
489             STATE_UPDATER = updater;
490         }
491 
492         private final HashedWheelTimer timer;
493         private final TimerTask task;
494         private final long deadline;
495 
496         @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
497         private volatile int state = ST_INIT;
498 
499         // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
500         // HashedWheelTimeout will be added to the correct HashedWheelBucket.
501         long remainingRounds;
502 
503         // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
504         // As only the workerThread will act on it there is no need for synchronization / volatile.
505         HashedWheelTimeout next;
506         HashedWheelTimeout prev;
507 
508         // The bucket to which the timeout was added
509         HashedWheelBucket bucket;
510 
511         HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
512             this.timer = timer;
513             this.task = task;
514             this.deadline = deadline;
515         }
516 
517         @Override
518         public Timer timer() {
519             return timer;
520         }
521 
522         @Override
523         public TimerTask task() {
524             return task;
525         }
526 
527         @Override
528         public boolean cancel() {
529             // only update the state it will be removed from HashedWheelBucket on next tick.
530             if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
531                 return false;
532             }
533             // If a task should be canceled we create a new Runnable for this to another queue which will
534             // be processed on each tick. So this means that we will have a GC latency of max. 1 tick duration
535             // which is good enough. This way we can make again use of our MpscLinkedQueue and so minimize the
536             // locking / overhead as much as possible.
537             //
538             // It is important that we not just add the HashedWheelTimeout itself again as it extends
539             // MpscLinkedQueueNode and so may still be used as tombstone.
540             timer.cancelledTimeouts.add(new Runnable() {
541                 @Override
542                 public void run() {
543                     HashedWheelBucket bucket = HashedWheelTimeout.this.bucket;
544                     if (bucket != null) {
545                         bucket.remove(HashedWheelTimeout.this);
546                     }
547                 }
548             });
549             return true;
550         }
551 
552         public boolean compareAndSetState(int expected, int state) {
553             return STATE_UPDATER.compareAndSet(this, expected, state);
554         }
555 
556         public int state() {
557             return state;
558         }
559 
560         @Override
561         public boolean isCancelled() {
562             return state() == ST_CANCELLED;
563         }
564 
565         @Override
566         public boolean isExpired() {
567             return state() == ST_EXPIRED;
568         }
569 
570         @Override
571         public HashedWheelTimeout value() {
572             return this;
573         }
574 
575         public void expire() {
576             if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
577                 return;
578             }
579 
580             try {
581                 task.run(this);
582             } catch (Throwable t) {
583                 if (logger.isWarnEnabled()) {
584                     logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
585                 }
586             }
587         }
588 
589         @Override
590         public String toString() {
591             final long currentTime = System.nanoTime();
592             long remaining = deadline - currentTime + timer.startTime;
593 
594             StringBuilder buf = new StringBuilder(192)
595                .append(StringUtil.simpleClassName(this))
596                .append('(')
597                .append("deadline: ");
598             if (remaining > 0) {
599                 buf.append(remaining)
600                    .append(" ns later");
601             } else if (remaining < 0) {
602                 buf.append(-remaining)
603                    .append(" ns ago");
604             } else {
605                 buf.append("now");
606             }
607 
608             if (isCancelled()) {
609                 buf.append(", cancelled");
610             }
611 
612             return buf.append(", task: ")
613                       .append(task())
614                       .append(')')
615                       .toString();
616         }
617     }
618 
619     /**
620      * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
621      * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
622      * extra object creation is needed.
623      */
624     private static final class HashedWheelBucket {
625         // Used for the linked-list datastructure
626         private HashedWheelTimeout head;
627         private HashedWheelTimeout tail;
628 
629         /**
630          * Add {@link HashedWheelTimeout} to this bucket.
631          */
632         public void addTimeout(HashedWheelTimeout timeout) {
633             assert timeout.bucket == null;
634             timeout.bucket = this;
635             if (head == null) {
636                 head = tail = timeout;
637             } else {
638                 tail.next = timeout;
639                 timeout.prev = tail;
640                 tail = timeout;
641             }
642         }
643 
644         /**
645          * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
646          */
647         public void expireTimeouts(long deadline) {
648             HashedWheelTimeout timeout = head;
649 
650             // process all timeouts
651             while (timeout != null) {
652                 boolean remove = false;
653                 if (timeout.remainingRounds <= 0) {
654                     if (timeout.deadline <= deadline) {
655                         timeout.expire();
656                     } else {
657                         // The timeout was placed into a wrong slot. This should never happen.
658                         throw new IllegalStateException(String.format(
659                                 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
660                     }
661                     remove = true;
662                 } else if (timeout.isCancelled()) {
663                     remove = true;
664                 } else {
665                     timeout.remainingRounds --;
666                 }
667                 // store reference to next as we may null out timeout.next in the remove block.
668                 HashedWheelTimeout next = timeout.next;
669                 if (remove) {
670                     remove(timeout);
671                 }
672                 timeout = next;
673             }
674         }
675 
676         public void remove(HashedWheelTimeout timeout) {
677             HashedWheelTimeout next = timeout.next;
678             // remove timeout that was either processed or cancelled by updating the linked-list
679             if (timeout.prev != null) {
680                 timeout.prev.next = next;
681             }
682             if (timeout.next != null) {
683                 timeout.next.prev = timeout.prev;
684             }
685 
686             if (timeout == head) {
687                 // if timeout is also the tail we need to adjust the entry too
688                 if (timeout == tail) {
689                     tail = null;
690                     head = null;
691                 } else {
692                     head = next;
693                 }
694             } else if (timeout == tail) {
695                 // if the timeout is the tail modify the tail to be the prev node.
696                 tail = timeout.prev;
697             }
698             // null out prev, next and bucket to allow for GC.
699             timeout.prev = null;
700             timeout.next = null;
701             timeout.bucket = null;
702         }
703 
704         /**
705          * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
706          */
707         public void clearTimeouts(Set<Timeout> set) {
708             for (;;) {
709                 HashedWheelTimeout timeout = pollTimeout();
710                 if (timeout == null) {
711                     return;
712                 }
713                 if (timeout.isExpired() || timeout.isCancelled()) {
714                     continue;
715                 }
716                 set.add(timeout);
717             }
718         }
719 
720         private HashedWheelTimeout pollTimeout() {
721             HashedWheelTimeout head = this.head;
722             if (head == null) {
723                 return null;
724             }
725             HashedWheelTimeout next = head.next;
726             if (next == null) {
727                 tail = this.head =  null;
728             } else {
729                 this.head = next;
730                 next.prev = null;
731             }
732 
733             // null out prev and next to allow for GC.
734             head.next = null;
735             head.prev = null;
736             head.bucket = null;
737             return head;
738         }
739     }
740 }