View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.util;
17  
18  import org.jboss.netty.channel.ChannelPipelineFactory;
19  import org.jboss.netty.logging.InternalLogger;
20  import org.jboss.netty.logging.InternalLoggerFactory;
21  import org.jboss.netty.util.internal.ConcurrentIdentityHashMap;
22  import org.jboss.netty.util.internal.DetectionUtil;
23  import org.jboss.netty.util.internal.ReusableIterator;
24  import org.jboss.netty.util.internal.SharedResourceMisuseDetector;
25  
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.List;
30  import java.util.Set;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicInteger;
35  import java.util.concurrent.locks.ReadWriteLock;
36  import java.util.concurrent.locks.ReentrantReadWriteLock;
37  
38  /**
39   * A {@link Timer} optimized for approximated I/O timeout scheduling.
40   *
41   * <h3>Tick Duration</h3>
42   *
43   * As described with 'approximated', this timer does not execute the scheduled
44   * {@link TimerTask} on time.  {@link HashedWheelTimer}, on every tick, will
45   * check if there are any {@link TimerTask}s behind the schedule and execute
46   * them.
47   * <p>
48   * You can increase or decrease the accuracy of the execution timing by
49   * specifying smaller or larger tick duration in the constructor.  In most
50   * network applications, I/O timeout does not need to be accurate.  Therefore,
51   * the default tick duration is 100 milliseconds and you will not need to try
52   * different configurations in most cases.
53   *
54   * <h3>Ticks per Wheel (Wheel Size)</h3>
55   *
56   * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
57   * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
58   * function is 'dead line of the task'.  The default number of ticks per wheel
59   * (i.e. the size of the wheel) is 512.  You could specify a larger value
60   * if you are going to schedule a lot of timeouts.
61   *
62   * <h3>Do not create many instances.</h3>
63   *
64   * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
65   * started.  Therefore, you should make sure to create only one instance and
66   * share it across your application.  One of the common mistakes, that makes
67   * your application unresponsive, is to create a new instance in
68   * {@link ChannelPipelineFactory}, which results in the creation of a new thread
69   * for every connection.
70   *
71   * <h3>Implementation Details</h3>
72   *
73   * {@link HashedWheelTimer} is based on
74   * <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
75   * Tony Lauck's paper,
76   * <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
77   * and Hierarchical Timing Wheels: data structures to efficiently implement a
78   * timer facility'</a>.  More comprehensive slides are located
79   * <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
80   */
81  public class HashedWheelTimer implements Timer {
82  
83      static final InternalLogger logger =
84          InternalLoggerFactory.getInstance(HashedWheelTimer.class);
85      private static final AtomicInteger id = new AtomicInteger();
86  
87      private static final SharedResourceMisuseDetector misuseDetector =
88          new SharedResourceMisuseDetector(HashedWheelTimer.class);
89  
90      private final Worker worker = new Worker();
91      final Thread workerThread;
92  
93      public static final int WORKER_STATE_INIT = 0;
94      public static final int WORKER_STATE_STARTED = 1;
95      public static final int WORKER_STATE_SHUTDOWN = 2;
96      final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down
97  
98      final long tickDuration;
99      final Set<HashedWheelTimeout>[] wheel;
100     final ReusableIterator<HashedWheelTimeout>[] iterators;
101     final int mask;
102     final ReadWriteLock lock = new ReentrantReadWriteLock();
103     volatile int wheelCursor;
104 
105     /**
106      * Creates a new timer with the default thread factory
107      * ({@link Executors#defaultThreadFactory()}), default tick duration, and
108      * default number of ticks per wheel.
109      */
110     public HashedWheelTimer() {
111         this(Executors.defaultThreadFactory());
112     }
113 
114     /**
115      * Creates a new timer with the default thread factory
116      * ({@link Executors#defaultThreadFactory()}) and default number of ticks
117      * per wheel.
118      *
119      * @param tickDuration   the duration between tick
120      * @param unit           the time unit of the {@code tickDuration}
121      */
122     public HashedWheelTimer(long tickDuration, TimeUnit unit) {
123         this(Executors.defaultThreadFactory(), tickDuration, unit);
124     }
125 
126     /**
127      * Creates a new timer with the default thread factory
128      * ({@link Executors#defaultThreadFactory()}).
129      *
130      * @param tickDuration   the duration between tick
131      * @param unit           the time unit of the {@code tickDuration}
132      * @param ticksPerWheel  the size of the wheel
133      */
134     public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
135         this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
136     }
137 
138     /**
139      * Creates a new timer with the default tick duration and default number of
140      * ticks per wheel.
141      *
142      * @param threadFactory  a {@link ThreadFactory} that creates a
143      *                       background {@link Thread} which is dedicated to
144      *                       {@link TimerTask} execution.
145      */
146     public HashedWheelTimer(ThreadFactory threadFactory) {
147         this(threadFactory, 100, TimeUnit.MILLISECONDS);
148     }
149 
150     /**
151      * Creates a new timer with the default number of ticks per wheel.
152      *
153      * @param threadFactory  a {@link ThreadFactory} that creates a
154      *                       background {@link Thread} which is dedicated to
155      *                       {@link TimerTask} execution.
156      * @param tickDuration   the duration between tick
157      * @param unit           the time unit of the {@code tickDuration}
158      */
159     public HashedWheelTimer(
160             ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
161         this(threadFactory, tickDuration, unit, 512);
162     }
163 
164     /**
165      * Creates a new timer.
166      *
167      * @param threadFactory  a {@link ThreadFactory} that creates a
168      *                       background {@link Thread} which is dedicated to
169      *                       {@link TimerTask} execution.
170      * @param tickDuration   the duration between tick
171      * @param unit           the time unit of the {@code tickDuration}
172      * @param ticksPerWheel  the size of the wheel
173      */
174     public HashedWheelTimer(
175             ThreadFactory threadFactory,
176             long tickDuration, TimeUnit unit, int ticksPerWheel) {
177         this(threadFactory, null, tickDuration, unit, ticksPerWheel);
178     }
179 
180     /**
181      * Creates a new timer.
182      *
183      * @param threadFactory  a {@link ThreadFactory} that creates a
184      *                       background {@link Thread} which is dedicated to
185      *                       {@link TimerTask} execution.
186      * @param determiner     thread name determiner to control thread name.
187      * @param tickDuration   the duration between tick
188      * @param unit           the time unit of the {@code tickDuration}
189      * @param ticksPerWheel  the size of the wheel
190      */
191     public HashedWheelTimer(
192             ThreadFactory threadFactory,
193             ThreadNameDeterminer determiner,
194             long tickDuration, TimeUnit unit, int ticksPerWheel) {
195 
196         if (threadFactory == null) {
197             throw new NullPointerException("threadFactory");
198         }
199         if (unit == null) {
200             throw new NullPointerException("unit");
201         }
202         if (tickDuration <= 0) {
203             throw new IllegalArgumentException(
204                     "tickDuration must be greater than 0: " + tickDuration);
205         }
206         if (ticksPerWheel <= 0) {
207             throw new IllegalArgumentException(
208                     "ticksPerWheel must be greater than 0: " + ticksPerWheel);
209         }
210 
211         // Normalize ticksPerWheel to power of two and initialize the wheel.
212         wheel = createWheel(ticksPerWheel);
213         iterators = createIterators(wheel);
214         mask = wheel.length - 1;
215 
216         // Convert tickDuration to nanos.
217         this.tickDuration = unit.toNanos(tickDuration);
218 
219         // Prevent overflow.
220         if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
221             throw new IllegalArgumentException(String.format(
222                     "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
223                     tickDuration, Long.MAX_VALUE / wheel.length));
224         }
225 
226         workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
227                         worker, "Hashed wheel timer #" + id.incrementAndGet(),
228                         determiner));
229 
230         // Misuse check
231         misuseDetector.increase();
232     }
233 
234     @SuppressWarnings("unchecked")
235     private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) {
236         if (ticksPerWheel <= 0) {
237             throw new IllegalArgumentException(
238                     "ticksPerWheel must be greater than 0: " + ticksPerWheel);
239         }
240         if (ticksPerWheel > 1073741824) {
241             throw new IllegalArgumentException(
242                     "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
243         }
244 
245         ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
246         Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
247         for (int i = 0; i < wheel.length; i ++) {
248             wheel[i] = new MapBackedSet<HashedWheelTimeout>(
249                     new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4));
250         }
251         return wheel;
252     }
253 
254     @SuppressWarnings("unchecked")
255     private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) {
256         ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length];
257         for (int i = 0; i < wheel.length; i ++) {
258             iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator();
259         }
260         return iterators;
261     }
262 
263     private static int normalizeTicksPerWheel(int ticksPerWheel) {
264         int normalizedTicksPerWheel = 1;
265         while (normalizedTicksPerWheel < ticksPerWheel) {
266             normalizedTicksPerWheel <<= 1;
267         }
268         return normalizedTicksPerWheel;
269     }
270 
271     /**
272      * Starts the background thread explicitly.  The background thread will
273      * start automatically on demand even if you did not call this method.
274      *
275      * @throws IllegalStateException if this timer has been
276      *                               {@linkplain #stop() stopped} already
277      */
278     public void start() {
279         switch (workerState.get()) {
280         case WORKER_STATE_INIT:
281             if (workerState.compareAndSet(WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
282                 workerThread.start();
283             }
284             break;
285         case WORKER_STATE_STARTED:
286             break;
287         case WORKER_STATE_SHUTDOWN:
288             throw new IllegalStateException("cannot be started once stopped");
289         default:
290             throw new Error("Invalid WorkerState");
291         }
292     }
293 
294     public Set<Timeout> stop() {
295         if (Thread.currentThread() == workerThread) {
296             throw new IllegalStateException(
297                     HashedWheelTimer.class.getSimpleName() +
298                     ".stop() cannot be called from " +
299                     TimerTask.class.getSimpleName());
300         }
301 
302         if (!workerState.compareAndSet(WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
303             // workerState can be 0 or 2 at this moment - let it always be 2.
304             workerState.set(WORKER_STATE_SHUTDOWN);
305             return Collections.emptySet();
306         }
307 
308         boolean interrupted = false;
309         while (workerThread.isAlive()) {
310             workerThread.interrupt();
311             try {
312                 workerThread.join(100);
313             } catch (InterruptedException e) {
314                 interrupted = true;
315             }
316         }
317 
318         if (interrupted) {
319             Thread.currentThread().interrupt();
320         }
321 
322         misuseDetector.decrease();
323 
324         Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
325         for (Set<HashedWheelTimeout> bucket: wheel) {
326             unprocessedTimeouts.addAll(bucket);
327             bucket.clear();
328         }
329 
330         return Collections.unmodifiableSet(unprocessedTimeouts);
331     }
332 
333     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
334         final long currentTime = System.nanoTime();
335 
336         if (task == null) {
337             throw new NullPointerException("task");
338         }
339         if (unit == null) {
340             throw new NullPointerException("unit");
341         }
342 
343         start();
344 
345         long delayInNanos = unit.toNanos(delay);
346         HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delayInNanos);
347         scheduleTimeout(timeout, delayInNanos);
348         return timeout;
349     }
350 
351     void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
352 
353         // Prepare the required parameters to schedule the timeout object.
354         long relativeIndex = (delay + tickDuration - 1) / tickDuration;
355 
356         // if the previous line had an overflow going on, then we’ll just schedule this timeout
357         // one tick early; that shouldn’t matter since we’re talking 270 years here
358         if (relativeIndex < 0) {
359             relativeIndex = delay / tickDuration;
360         }
361         if (relativeIndex == 0) {
362             relativeIndex = 1;
363         }
364         if ((relativeIndex & mask) == 0) {
365             relativeIndex--;
366         }
367         final long remainingRounds = relativeIndex / wheel.length;
368 
369         // Add the timeout to the wheel.
370         lock.readLock().lock();
371         try {
372             if (workerState.get() == WORKER_STATE_SHUTDOWN) {
373                 throw new IllegalStateException("Cannot enqueue after shutdown");
374             }
375             final int stopIndex = (int) (wheelCursor + relativeIndex & mask);
376             timeout.stopIndex = stopIndex;
377             timeout.remainingRounds = remainingRounds;
378 
379             wheel[stopIndex].add(timeout);
380         } finally {
381             lock.readLock().unlock();
382         }
383     }
384 
385     private final class Worker implements Runnable {
386 
387         private long startTime;
388         private long tick;
389 
390         Worker() {
391         }
392 
393         public void run() {
394             List<HashedWheelTimeout> expiredTimeouts =
395                 new ArrayList<HashedWheelTimeout>();
396 
397             startTime = System.nanoTime();
398             tick = 1;
399 
400             while (workerState.get() == WORKER_STATE_STARTED) {
401                 final long deadline = waitForNextTick();
402                 if (deadline > 0) {
403                     fetchExpiredTimeouts(expiredTimeouts, deadline);
404                     notifyExpiredTimeouts(expiredTimeouts);
405                 }
406             }
407         }
408 
409         private void fetchExpiredTimeouts(
410                 List<HashedWheelTimeout> expiredTimeouts, long deadline) {
411 
412             // Find the expired timeouts and decrease the round counter
413             // if necessary.  Note that we don't send the notification
414             // immediately to make sure the listeners are called without
415             // an exclusive lock.
416             lock.writeLock().lock();
417             try {
418                 int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
419                 ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor];
420                 fetchExpiredTimeouts(expiredTimeouts, i, deadline);
421             } finally {
422                 lock.writeLock().unlock();
423             }
424         }
425 
426         private void fetchExpiredTimeouts(
427                 List<HashedWheelTimeout> expiredTimeouts,
428                 ReusableIterator<HashedWheelTimeout> i, long deadline) {
429 
430             List<HashedWheelTimeout> slipped = null;
431             i.rewind();
432             while (i.hasNext()) {
433                 HashedWheelTimeout timeout = i.next();
434                 if (timeout.remainingRounds <= 0) {
435                     i.remove();
436                     if (timeout.deadline <= deadline) {
437                         expiredTimeouts.add(timeout);
438                     } else {
439                         // Handle the case where the timeout is put into a wrong
440                         // place, usually one tick earlier.  For now, just add
441                         // it to a temporary list - we will reschedule it in a
442                         // separate loop.
443                         if (slipped == null) {
444                             slipped = new ArrayList<HashedWheelTimeout>();
445                         }
446                         slipped.add(timeout);
447                     }
448                 } else {
449                     timeout.remainingRounds --;
450                 }
451             }
452 
453             // Reschedule the slipped timeouts.
454             if (slipped != null) {
455                 for (HashedWheelTimeout timeout: slipped) {
456                     scheduleTimeout(timeout, timeout.deadline - deadline);
457                 }
458             }
459         }
460 
461         private void notifyExpiredTimeouts(
462                 List<HashedWheelTimeout> expiredTimeouts) {
463             // Notify the expired timeouts.
464             for (int i = expiredTimeouts.size() - 1; i >= 0; i --) {
465                 expiredTimeouts.get(i).expire();
466             }
467 
468             // Clean up the temporary list.
469             expiredTimeouts.clear();
470         }
471 
472        /**
473          * calculate goal nanoTime from startTime and current tick number,
474          * then wait until that goal has been reached.
475          * @return Long.MIN_VALUE if received a shutdown request,
476          * current time otherwise (with Long.MIN_VALUE changed by +1)
477          */
478         private long waitForNextTick() {
479             long deadline = startTime + tickDuration * tick;
480 
481             for (;;) {
482                 final long currentTime = System.nanoTime();
483                 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
484 
485                 if (sleepTimeMs <= 0) {
486                     tick += 1;
487                     if (currentTime == Long.MIN_VALUE) {
488                         return -Long.MAX_VALUE;
489                     } else {
490                         return currentTime;
491                     }
492                 }
493 
494                 // Check if we run on windows, as if thats the case we will need
495                 // to round the sleepTime as workaround for a bug that only affect
496                 // the JVM if it runs on windows.
497                 //
498                 // See https://github.com/netty/netty/issues/356
499                 if (DetectionUtil.isWindows()) {
500                     sleepTimeMs = sleepTimeMs / 10 * 10;
501                 }
502                 try {
503                     Thread.sleep(sleepTimeMs);
504                 } catch (InterruptedException e) {
505                     if (workerState.get() == WORKER_STATE_SHUTDOWN) {
506                         return Long.MIN_VALUE;
507                     }
508                 }
509             }
510         }
511     }
512 
513     private final class HashedWheelTimeout implements Timeout {
514 
515         private static final int ST_INIT = 0;
516         private static final int ST_CANCELLED = 1;
517         private static final int ST_EXPIRED = 2;
518 
519         private final TimerTask task;
520         final long deadline;
521         volatile int stopIndex;
522         volatile long remainingRounds;
523         private final AtomicInteger state = new AtomicInteger(ST_INIT);
524 
525         HashedWheelTimeout(TimerTask task, long deadline) {
526             this.task = task;
527             this.deadline = deadline;
528         }
529 
530         public Timer getTimer() {
531             return HashedWheelTimer.this;
532         }
533 
534         public TimerTask getTask() {
535             return task;
536         }
537 
538         public void cancel() {
539             if (!state.compareAndSet(ST_INIT, ST_CANCELLED)) {
540                 // TODO return false
541                 return;
542             }
543 
544             wheel[stopIndex].remove(this);
545         }
546 
547         public boolean isCancelled() {
548             return state.get() == ST_CANCELLED;
549         }
550 
551         public boolean isExpired() {
552             return state.get() != ST_INIT;
553         }
554 
555         public void expire() {
556             if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) {
557                 return;
558             }
559 
560             try {
561                 task.run(this);
562             } catch (Throwable t) {
563                 if (logger.isWarnEnabled()) {
564                     logger.warn(
565                             "An exception was thrown by " +
566                             TimerTask.class.getSimpleName() + '.', t);
567                 }
568             }
569         }
570 
571         @Override
572         public String toString() {
573             long currentTime = System.nanoTime();
574             long remaining = deadline - currentTime;
575 
576             StringBuilder buf = new StringBuilder(192);
577             buf.append(getClass().getSimpleName());
578             buf.append('(');
579 
580             buf.append("deadline: ");
581             if (remaining > 0) {
582                 buf.append(remaining);
583                 buf.append(" ms later, ");
584             } else if (remaining < 0) {
585                 buf.append(-remaining);
586                 buf.append(" ms ago, ");
587             } else {
588                 buf.append("now, ");
589             }
590 
591             if (isCancelled()) {
592                 buf.append(", cancelled");
593             }
594 
595             return buf.append(')').toString();
596         }
597     }
598 }