View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.DefaultPriorityQueue;
19  import io.netty.util.internal.ObjectUtil;
20  import io.netty.util.internal.PriorityQueue;
21  
22  import java.util.Comparator;
23  import java.util.Objects;
24  import java.util.Queue;
25  import java.util.concurrent.Callable;
26  import java.util.concurrent.TimeUnit;
27  
28  /**
29   * Abstract base class for {@link EventExecutor}s that want to support scheduling.
30   */
31  public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
32      private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
33              new Comparator<ScheduledFutureTask<?>>() {
34                  @Override
35                  public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
36                      return o1.compareTo(o2);
37                  }
38              };
39  
40      private static final long START_TIME = System.nanoTime();
41  
42      static final Runnable WAKEUP_TASK = new Runnable() {
43         @Override
44         public void run() { } // Do nothing
45      };
46  
47      PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
48  
49      long nextTaskId;
50  
51      protected AbstractScheduledEventExecutor() {
52      }
53  
54      protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
55          super(parent);
56      }
57  
58      /**
59       * Get the current time in nanoseconds by this executor's clock. This is not the same as {@link System#nanoTime()}
60       * for two reasons:
61       *
62       * <ul>
63       *     <li>We apply a fixed offset to the {@link System#nanoTime() nanoTime}</li>
64       *     <li>Implementations (in particular EmbeddedEventLoop) may use their own time source so they can control time
65       *     for testing purposes.</li>
66       * </ul>
67       */
68      protected long getCurrentTimeNanos() {
69          return defaultCurrentTimeNanos();
70      }
71  
72      /**
73       * @deprecated Use the non-static {@link #getCurrentTimeNanos()} instead.
74       */
75      @Deprecated
76      protected static long nanoTime() {
77          return defaultCurrentTimeNanos();
78      }
79  
80      static long defaultCurrentTimeNanos() {
81          return System.nanoTime() - START_TIME;
82      }
83  
84      static long deadlineNanos(long nanoTime, long delay) {
85          long deadlineNanos = nanoTime + delay;
86          // Guard against overflow
87          return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
88      }
89  
90      /**
91       * Given an arbitrary deadline {@code deadlineNanos}, calculate the number of nano seconds from now
92       * {@code deadlineNanos} would expire.
93       * @param deadlineNanos An arbitrary deadline in nano seconds.
94       * @return the number of nano seconds from now {@code deadlineNanos} would expire.
95       */
96      protected static long deadlineToDelayNanos(long deadlineNanos) {
97          return ScheduledFutureTask.deadlineToDelayNanos(defaultCurrentTimeNanos(), deadlineNanos);
98      }
99  
100     /**
101      * Returns the amount of time left until the scheduled task with the closest dead line is executed.
102      */
103     protected long delayNanos(long currentTimeNanos, long scheduledPurgeInterval) {
104         currentTimeNanos -= initialNanoTime();
105 
106         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
107         if (scheduledTask == null) {
108             return scheduledPurgeInterval;
109         }
110 
111         return scheduledTask.delayNanos(currentTimeNanos);
112     }
113 
114     /**
115      * The initial value used for delay and computations based upon a monatomic time source.
116      * @return initial value used for delay and computations based upon a monatomic time source.
117      */
118     protected static long initialNanoTime() {
119         return START_TIME;
120     }
121 
122     PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
123         if (scheduledTaskQueue == null) {
124             scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
125                     SCHEDULED_FUTURE_TASK_COMPARATOR,
126                     // Use same initial capacity as java.util.PriorityQueue
127                     11);
128         }
129         return scheduledTaskQueue;
130     }
131 
132     private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
133         return queue == null || queue.isEmpty();
134     }
135 
136     /**
137      * Cancel all scheduled tasks.
138      *
139      * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
140      */
141     protected void cancelScheduledTasks() {
142         assert inEventLoop();
143         PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
144         if (isNullOrEmpty(scheduledTaskQueue)) {
145             return;
146         }
147 
148         final ScheduledFutureTask<?>[] scheduledTasks =
149                 scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]);
150 
151         for (ScheduledFutureTask<?> task: scheduledTasks) {
152             task.cancelWithoutRemove(false);
153         }
154 
155         scheduledTaskQueue.clearIgnoringIndexes();
156     }
157 
158     /**
159      * @see #pollScheduledTask(long)
160      */
161     protected final Runnable pollScheduledTask() {
162         return pollScheduledTask(getCurrentTimeNanos());
163     }
164 
165     /**
166      * Fetch scheduled tasks from the internal queue and add these to the given {@link Queue}.
167      *
168      * @param taskQueue the task queue into which the fetched scheduled tasks should be transferred.
169      * @return {@code true} if we were able to transfer everything, {@code false} if we need to call this method again
170      *         as soon as there is space again in {@code taskQueue}.
171      */
172     protected boolean fetchFromScheduledTaskQueue(Queue<Runnable> taskQueue) {
173         assert inEventLoop();
174         Objects.requireNonNull(taskQueue, "taskQueue");
175         if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
176             return true;
177         }
178         long nanoTime = getCurrentTimeNanos();
179         for (;;) {
180             Runnable scheduledTask = pollScheduledTask(nanoTime);
181             if (scheduledTask == null) {
182                 return true;
183             }
184             if (!taskQueue.offer(scheduledTask)) {
185                 // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
186                 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
187                 return false;
188             }
189         }
190     }
191 
192     /**
193      * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
194      * You should use {@link #getCurrentTimeNanos()} to retrieve the correct {@code nanoTime}.
195      */
196     protected final Runnable pollScheduledTask(long nanoTime) {
197         assert inEventLoop();
198 
199         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
200         if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
201             return null;
202         }
203         scheduledTaskQueue.remove();
204         scheduledTask.setConsumed();
205         return scheduledTask;
206     }
207 
208     /**
209      * Return the nanoseconds until the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
210      */
211     protected final long nextScheduledTaskNano() {
212         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
213         return scheduledTask != null ? scheduledTask.delayNanos() : -1;
214     }
215 
216     /**
217      * Return the deadline (in nanoseconds) when the next scheduled task is ready to be run or {@code -1}
218      * if no task is scheduled.
219      */
220     protected final long nextScheduledTaskDeadlineNanos() {
221         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
222         return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
223     }
224 
225     final ScheduledFutureTask<?> peekScheduledTask() {
226         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
227         return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
228     }
229 
230     /**
231      * Returns {@code true} if a scheduled task is ready for processing.
232      */
233     protected final boolean hasScheduledTasks() {
234         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
235         return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos();
236     }
237 
238     @Override
239     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
240         ObjectUtil.checkNotNull(command, "command");
241         ObjectUtil.checkNotNull(unit, "unit");
242         if (delay < 0) {
243             delay = 0;
244         }
245         validateScheduled0(delay, unit);
246 
247         return schedule(new ScheduledFutureTask<Void>(
248                 this,
249                 command,
250                 deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
251     }
252 
253     @Override
254     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
255         ObjectUtil.checkNotNull(callable, "callable");
256         ObjectUtil.checkNotNull(unit, "unit");
257         if (delay < 0) {
258             delay = 0;
259         }
260         validateScheduled0(delay, unit);
261 
262         return schedule(new ScheduledFutureTask<V>(
263                 this, callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
264     }
265 
266     @Override
267     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
268         ObjectUtil.checkNotNull(command, "command");
269         ObjectUtil.checkNotNull(unit, "unit");
270         if (initialDelay < 0) {
271             throw new IllegalArgumentException(
272                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
273         }
274         if (period <= 0) {
275             throw new IllegalArgumentException(
276                     String.format("period: %d (expected: > 0)", period));
277         }
278         validateScheduled0(initialDelay, unit);
279         validateScheduled0(period, unit);
280 
281         return schedule(new ScheduledFutureTask<Void>(
282                 this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period)));
283     }
284 
285     @Override
286     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
287         ObjectUtil.checkNotNull(command, "command");
288         ObjectUtil.checkNotNull(unit, "unit");
289         if (initialDelay < 0) {
290             throw new IllegalArgumentException(
291                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
292         }
293         if (delay <= 0) {
294             throw new IllegalArgumentException(
295                     String.format("delay: %d (expected: > 0)", delay));
296         }
297 
298         validateScheduled0(initialDelay, unit);
299         validateScheduled0(delay, unit);
300 
301         return schedule(new ScheduledFutureTask<Void>(
302                 this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay)));
303     }
304 
305     @SuppressWarnings("deprecation")
306     private void validateScheduled0(long amount, TimeUnit unit) {
307         validateScheduled(amount, unit);
308     }
309 
310     /**
311      * Sub-classes may override this to restrict the maximal amount of time someone can use to schedule a task.
312      *
313      * @deprecated will be removed in the future.
314      */
315     @Deprecated
316     protected void validateScheduled(long amount, TimeUnit unit) {
317         // NOOP
318     }
319 
320     final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
321         // nextTaskId a long and so there is no chance it will overflow back to 0
322         scheduledTaskQueue().add(task.setId(++nextTaskId));
323     }
324 
325     private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
326         if (inEventLoop()) {
327             scheduleFromEventLoop(task);
328         } else {
329             final long deadlineNanos = task.deadlineNanos();
330             // task will add itself to scheduled task queue when run if not expired
331             if (beforeScheduledTaskSubmitted(deadlineNanos)) {
332                 execute(task);
333             } else {
334                 lazyExecute(task);
335                 // Second hook after scheduling to facilitate race-avoidance
336                 if (afterScheduledTaskSubmitted(deadlineNanos)) {
337                     execute(WAKEUP_TASK);
338                 }
339             }
340         }
341 
342         return task;
343     }
344 
345     final void removeScheduled(final ScheduledFutureTask<?> task) {
346         assert task.isCancelled();
347         if (inEventLoop()) {
348             scheduledTaskQueue().removeTyped(task);
349         } else {
350             // task will remove itself from scheduled task queue when it runs
351             scheduleRemoveScheduled(task);
352         }
353     }
354 
355     void scheduleRemoveScheduled(final ScheduledFutureTask<?> task) {
356         // task will remove itself from scheduled task queue when it runs
357         lazyExecute(task);
358     }
359 
360     /**
361      * Called from arbitrary non-{@link EventExecutor} threads prior to scheduled task submission.
362      * Returns {@code true} if the {@link EventExecutor} thread should be woken immediately to
363      * process the scheduled task (if not already awake).
364      * <p>
365      * If {@code false} is returned, {@link #afterScheduledTaskSubmitted(long)} will be called with
366      * the same value <i>after</i> the scheduled task is enqueued, providing another opportunity
367      * to wake the {@link EventExecutor} thread if required.
368      *
369      * @param deadlineNanos deadline of the to-be-scheduled task
370      *     relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()}
371      * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
372      */
373     protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
374         return true;
375     }
376 
377     /**
378      * See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false.
379      *
380      * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()}
381      * @return  {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
382      */
383     protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
384         return true;
385     }
386 }