View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.util;
17  
18  import org.jboss.netty.channel.ChannelPipelineFactory;
19  import org.jboss.netty.logging.InternalLogger;
20  import org.jboss.netty.logging.InternalLoggerFactory;
21  import org.jboss.netty.util.internal.ConcurrentIdentityHashMap;
22  import org.jboss.netty.util.internal.DetectionUtil;
23  import org.jboss.netty.util.internal.ReusableIterator;
24  import org.jboss.netty.util.internal.SharedResourceMisuseDetector;
25  
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.List;
30  import java.util.Set;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicInteger;
35  import java.util.concurrent.locks.ReadWriteLock;
36  import java.util.concurrent.locks.ReentrantReadWriteLock;
37  
38  /**
39   * A {@link Timer} optimized for approximated I/O timeout scheduling.
40   *
41   * <h3>Tick Duration</h3>
42   *
43   * As described with 'approximated', this timer does not execute the scheduled
44   * {@link TimerTask} on time.  {@link HashedWheelTimer}, on every tick, will
45   * check if there are any {@link TimerTask}s behind the schedule and execute
46   * them.
47   * <p>
48   * You can increase or decrease the accuracy of the execution timing by
49   * specifying smaller or larger tick duration in the constructor.  In most
50   * network applications, I/O timeout does not need to be accurate.  Therefore,
51   * the default tick duration is 100 milliseconds and you will not need to try
52   * different configurations in most cases.
53   *
54   * <h3>Ticks per Wheel (Wheel Size)</h3>
55   *
56   * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
57   * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
58   * function is 'dead line of the task'.  The default number of ticks per wheel
59   * (i.e. the size of the wheel) is 512.  You could specify a larger value
60   * if you are going to schedule a lot of timeouts.
61   *
62   * <h3>Do not create many instances.</h3>
63   *
64   * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
65   * started.  Therefore, you should make sure to create only one instance and
66   * share it across your application.  One of the common mistakes, that makes
67   * your application unresponsive, is to create a new instance in
68   * {@link ChannelPipelineFactory}, which results in the creation of a new thread
69   * for every connection.
70   *
71   * <h3>Implementation Details</h3>
72   *
73   * {@link HashedWheelTimer} is based on
74   * <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
75   * Tony Lauck's paper,
76   * <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
77   * and Hierarchical Timing Wheels: data structures to efficiently implement a
78   * timer facility'</a>.  More comprehensive slides are located
79   * <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
80   */
81  public class HashedWheelTimer implements Timer {
82  
83      static final InternalLogger logger =
84          InternalLoggerFactory.getInstance(HashedWheelTimer.class);
85      private static final AtomicInteger id = new AtomicInteger();
86  
87      private static final SharedResourceMisuseDetector misuseDetector =
88          new SharedResourceMisuseDetector(HashedWheelTimer.class);
89  
90      private final Worker worker = new Worker();
91      final Thread workerThread;
92      final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down
93  
94      private final long roundDuration;
95      final long tickDuration;
96      final Set<HashedWheelTimeout>[] wheel;
97      final ReusableIterator<HashedWheelTimeout>[] iterators;
98      final int mask;
99      final ReadWriteLock lock = new ReentrantReadWriteLock();
100     volatile int wheelCursor;
101 
102     /**
103      * Creates a new timer with the default thread factory
104      * ({@link Executors#defaultThreadFactory()}), default tick duration, and
105      * default number of ticks per wheel.
106      */
107     public HashedWheelTimer() {
108         this(Executors.defaultThreadFactory());
109     }
110 
111     /**
112      * Creates a new timer with the default thread factory
113      * ({@link Executors#defaultThreadFactory()}) and default number of ticks
114      * per wheel.
115      *
116      * @param tickDuration   the duration between tick
117      * @param unit           the time unit of the {@code tickDuration}
118      */
119     public HashedWheelTimer(long tickDuration, TimeUnit unit) {
120         this(Executors.defaultThreadFactory(), tickDuration, unit);
121     }
122 
123     /**
124      * Creates a new timer with the default thread factory
125      * ({@link Executors#defaultThreadFactory()}).
126      *
127      * @param tickDuration   the duration between tick
128      * @param unit           the time unit of the {@code tickDuration}
129      * @param ticksPerWheel  the size of the wheel
130      */
131     public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
132         this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
133     }
134 
135     /**
136      * Creates a new timer with the default tick duration and default number of
137      * ticks per wheel.
138      *
139      * @param threadFactory  a {@link ThreadFactory} that creates a
140      *                       background {@link Thread} which is dedicated to
141      *                       {@link TimerTask} execution.
142      */
143     public HashedWheelTimer(ThreadFactory threadFactory) {
144         this(threadFactory, 100, TimeUnit.MILLISECONDS);
145     }
146 
147     /**
148      * Creates a new timer with the default number of ticks per wheel.
149      *
150      * @param threadFactory  a {@link ThreadFactory} that creates a
151      *                       background {@link Thread} which is dedicated to
152      *                       {@link TimerTask} execution.
153      * @param tickDuration   the duration between tick
154      * @param unit           the time unit of the {@code tickDuration}
155      */
156     public HashedWheelTimer(
157             ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
158         this(threadFactory, tickDuration, unit, 512);
159     }
160 
161     /**
162      * Creates a new timer.
163      *
164      * @param threadFactory  a {@link ThreadFactory} that creates a
165      *                       background {@link Thread} which is dedicated to
166      *                       {@link TimerTask} execution.
167      * @param tickDuration   the duration between tick
168      * @param unit           the time unit of the {@code tickDuration}
169      * @param ticksPerWheel  the size of the wheel
170      */
171     public HashedWheelTimer(
172             ThreadFactory threadFactory,
173             long tickDuration, TimeUnit unit, int ticksPerWheel) {
174 
175         if (threadFactory == null) {
176             throw new NullPointerException("threadFactory");
177         }
178         if (unit == null) {
179             throw new NullPointerException("unit");
180         }
181         if (tickDuration <= 0) {
182             throw new IllegalArgumentException(
183                     "tickDuration must be greater than 0: " + tickDuration);
184         }
185         if (ticksPerWheel <= 0) {
186             throw new IllegalArgumentException(
187                     "ticksPerWheel must be greater than 0: " + ticksPerWheel);
188         }
189 
190         // Normalize ticksPerWheel to power of two and initialize the wheel.
191         wheel = createWheel(ticksPerWheel);
192         iterators = createIterators(wheel);
193         mask = wheel.length - 1;
194 
195         // Convert tickDuration to milliseconds.
196         this.tickDuration = tickDuration = unit.toMillis(tickDuration);
197 
198         // Prevent overflow.
199         if (tickDuration == Long.MAX_VALUE ||
200                 tickDuration >= Long.MAX_VALUE / wheel.length) {
201             throw new IllegalArgumentException(
202                     "tickDuration is too long: " +
203                     tickDuration +  ' ' + unit);
204         }
205 
206         roundDuration = tickDuration * wheel.length;
207 
208         workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
209                         worker, "Hashed wheel timer #" + id.incrementAndGet()));
210 
211         // Misuse check
212         misuseDetector.increase();
213     }
214 
215     @SuppressWarnings("unchecked")
216     private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) {
217         if (ticksPerWheel <= 0) {
218             throw new IllegalArgumentException(
219                     "ticksPerWheel must be greater than 0: " + ticksPerWheel);
220         }
221         if (ticksPerWheel > 1073741824) {
222             throw new IllegalArgumentException(
223                     "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
224         }
225 
226         ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
227         Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
228         for (int i = 0; i < wheel.length; i ++) {
229             wheel[i] = new MapBackedSet<HashedWheelTimeout>(
230                     new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4));
231         }
232         return wheel;
233     }
234 
235     @SuppressWarnings("unchecked")
236     private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) {
237         ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length];
238         for (int i = 0; i < wheel.length; i ++) {
239             iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator();
240         }
241         return iterators;
242     }
243 
244     private static int normalizeTicksPerWheel(int ticksPerWheel) {
245         int normalizedTicksPerWheel = 1;
246         while (normalizedTicksPerWheel < ticksPerWheel) {
247             normalizedTicksPerWheel <<= 1;
248         }
249         return normalizedTicksPerWheel;
250     }
251 
252     /**
253      * Starts the background thread explicitly.  The background thread will
254      * start automatically on demand even if you did not call this method.
255      *
256      * @throws IllegalStateException if this timer has been
257      *                               {@linkplain #stop() stopped} already
258      */
259     public void start() {
260         switch (workerState.get()) {
261         case 0:
262             if (workerState.compareAndSet(0, 1)) {
263                 workerThread.start();
264             }
265             break;
266         case 1:
267             break;
268         case 2:
269             throw new IllegalStateException("cannot be started once stopped");
270         default:
271             throw new Error();
272         }
273     }
274 
275     public Set<Timeout> stop() {
276         if (Thread.currentThread() == workerThread) {
277             throw new IllegalStateException(
278                     HashedWheelTimer.class.getSimpleName() +
279                     ".stop() cannot be called from " +
280                     TimerTask.class.getSimpleName());
281         }
282 
283         if (workerState.getAndSet(2) != 1) {
284             // workerState wasn't 1, so return an empty set
285             return Collections.emptySet();
286         }
287 
288         boolean interrupted = false;
289         while (workerThread.isAlive()) {
290             workerThread.interrupt();
291             try {
292                 workerThread.join(100);
293             } catch (InterruptedException e) {
294                 interrupted = true;
295             }
296         }
297 
298         if (interrupted) {
299             Thread.currentThread().interrupt();
300         }
301 
302         misuseDetector.decrease();
303 
304         Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
305         for (Set<HashedWheelTimeout> bucket: wheel) {
306             unprocessedTimeouts.addAll(bucket);
307             bucket.clear();
308         }
309 
310         return Collections.unmodifiableSet(unprocessedTimeouts);
311     }
312 
313     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
314         final long currentTime = System.currentTimeMillis();
315 
316         if (task == null) {
317             throw new NullPointerException("task");
318         }
319         if (unit == null) {
320             throw new NullPointerException("unit");
321         }
322 
323         start();
324 
325         delay = unit.toMillis(delay);
326         HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay);
327         scheduleTimeout(timeout, delay);
328         return timeout;
329     }
330 
331     void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
332         // delay must be equal to or greater than tickDuration so that the
333         // worker thread never misses the timeout.
334         if (delay < tickDuration) {
335             delay = tickDuration;
336         }
337 
338         // Prepare the required parameters to schedule the timeout object.
339         final long lastRoundDelay = delay % roundDuration;
340         final long lastTickDelay = delay % tickDuration;
341         final long relativeIndex =
342             lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0);
343 
344         final long remainingRounds =
345             delay / roundDuration - (delay % roundDuration == 0? 1 : 0);
346 
347         // Add the timeout to the wheel.
348         lock.readLock().lock();
349         try {
350             int stopIndex = (int) (wheelCursor + relativeIndex & mask);
351             timeout.stopIndex = stopIndex;
352             timeout.remainingRounds = remainingRounds;
353 
354             wheel[stopIndex].add(timeout);
355         } finally {
356             lock.readLock().unlock();
357         }
358     }
359 
360     private final class Worker implements Runnable {
361 
362         private long startTime;
363         private long tick;
364 
365         Worker() {
366         }
367 
368         public void run() {
369             List<HashedWheelTimeout> expiredTimeouts =
370                 new ArrayList<HashedWheelTimeout>();
371 
372             startTime = System.currentTimeMillis();
373             tick = 1;
374 
375             while (workerState.get() == 1) {
376                 final long deadline = waitForNextTick();
377                 if (deadline > 0) {
378                     fetchExpiredTimeouts(expiredTimeouts, deadline);
379                     notifyExpiredTimeouts(expiredTimeouts);
380                 }
381             }
382         }
383 
384         private void fetchExpiredTimeouts(
385                 List<HashedWheelTimeout> expiredTimeouts, long deadline) {
386 
387             // Find the expired timeouts and decrease the round counter
388             // if necessary.  Note that we don't send the notification
389             // immediately to make sure the listeners are called without
390             // an exclusive lock.
391             lock.writeLock().lock();
392             try {
393                 int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
394                 ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor];
395                 fetchExpiredTimeouts(expiredTimeouts, i, deadline);
396             } finally {
397                 lock.writeLock().unlock();
398             }
399         }
400 
401         private void fetchExpiredTimeouts(
402                 List<HashedWheelTimeout> expiredTimeouts,
403                 ReusableIterator<HashedWheelTimeout> i, long deadline) {
404 
405             List<HashedWheelTimeout> slipped = null;
406             i.rewind();
407             while (i.hasNext()) {
408                 HashedWheelTimeout timeout = i.next();
409                 if (timeout.remainingRounds <= 0) {
410                     i.remove();
411                     if (timeout.deadline <= deadline) {
412                         expiredTimeouts.add(timeout);
413                     } else {
414                         // Handle the case where the timeout is put into a wrong
415                         // place, usually one tick earlier.  For now, just add
416                         // it to a temporary list - we will reschedule it in a
417                         // separate loop.
418                         if (slipped == null) {
419                             slipped = new ArrayList<HashedWheelTimer.HashedWheelTimeout>();
420                         }
421                         slipped.add(timeout);
422                     }
423                 } else {
424                     timeout.remainingRounds --;
425                 }
426             }
427 
428             // Reschedule the slipped timeouts.
429             if (slipped != null) {
430                 for (HashedWheelTimeout timeout: slipped) {
431                     scheduleTimeout(timeout, timeout.deadline - deadline);
432                 }
433             }
434         }
435 
436         private void notifyExpiredTimeouts(
437                 List<HashedWheelTimeout> expiredTimeouts) {
438             // Notify the expired timeouts.
439             for (int i = expiredTimeouts.size() - 1; i >= 0; i --) {
440                 expiredTimeouts.get(i).expire();
441             }
442 
443             // Clean up the temporary list.
444             expiredTimeouts.clear();
445         }
446 
447         private long waitForNextTick() {
448             long deadline = startTime + tickDuration * tick;
449 
450             for (;;) {
451                 final long currentTime = System.currentTimeMillis();
452                 long sleepTime = tickDuration * tick - (currentTime - startTime);
453 
454                 // Check if we run on windows, as if thats the case we will need
455                 // to round the sleepTime as workaround for a bug that only affect
456                 // the JVM if it runs on windows.
457                 //
458                 // See https://github.com/netty/netty/issues/356
459                 if (DetectionUtil.isWindows()) {
460                     sleepTime = sleepTime / 10 * 10;
461                 }
462 
463                 if (sleepTime <= 0) {
464                     break;
465                 }
466                 try {
467                     Thread.sleep(sleepTime);
468                 } catch (InterruptedException e) {
469                     if (workerState.get() != 1) {
470                         return -1;
471                     }
472                 }
473             }
474 
475             // Increase the tick.
476             tick ++;
477             return deadline;
478         }
479     }
480 
481     private final class HashedWheelTimeout implements Timeout {
482 
483         private static final int ST_INIT = 0;
484         private static final int ST_CANCELLED = 1;
485         private static final int ST_EXPIRED = 2;
486 
487         private final TimerTask task;
488         final long deadline;
489         volatile int stopIndex;
490         volatile long remainingRounds;
491         private final AtomicInteger state = new AtomicInteger(ST_INIT);
492 
493         HashedWheelTimeout(TimerTask task, long deadline) {
494             this.task = task;
495             this.deadline = deadline;
496         }
497 
498         public Timer getTimer() {
499             return HashedWheelTimer.this;
500         }
501 
502         public TimerTask getTask() {
503             return task;
504         }
505 
506         public void cancel() {
507             if (!state.compareAndSet(ST_INIT, ST_CANCELLED)) {
508                 // TODO return false
509                 return;
510             }
511 
512             wheel[stopIndex].remove(this);
513         }
514 
515         public boolean isCancelled() {
516             return state.get() == ST_CANCELLED;
517         }
518 
519         public boolean isExpired() {
520             return state.get() != ST_INIT;
521         }
522 
523         public void expire() {
524             if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) {
525                 return;
526             }
527 
528             try {
529                 task.run(this);
530             } catch (Throwable t) {
531                 if (logger.isWarnEnabled()) {
532                     logger.warn(
533                             "An exception was thrown by " +
534                             TimerTask.class.getSimpleName() + '.', t);
535                 }
536 
537             }
538         }
539 
540         @Override
541         public String toString() {
542             long currentTime = System.currentTimeMillis();
543             long remaining = deadline - currentTime;
544 
545             StringBuilder buf = new StringBuilder(192);
546             buf.append(getClass().getSimpleName());
547             buf.append('(');
548 
549             buf.append("deadline: ");
550             if (remaining > 0) {
551                 buf.append(remaining);
552                 buf.append(" ms later, ");
553             } else if (remaining < 0) {
554                 buf.append(-remaining);
555                 buf.append(" ms ago, ");
556             } else {
557                 buf.append("now, ");
558             }
559 
560             if (isCancelled()) {
561                 buf.append(", cancelled");
562             }
563 
564             return buf.append(')').toString();
565         }
566     }
567 }