View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.DefaultPriorityQueue;
19  import io.netty.util.internal.ObjectUtil;
20  import io.netty.util.internal.PriorityQueue;
21  
22  import java.util.Comparator;
23  import java.util.Objects;
24  import java.util.Queue;
25  import java.util.concurrent.Callable;
26  import java.util.concurrent.TimeUnit;
27  
28  /**
29   * Abstract base class for {@link EventExecutor}s that want to support scheduling.
30   */
31  public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
32      private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
33              new Comparator<ScheduledFutureTask<?>>() {
34                  @Override
35                  public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
36                      return o1.compareTo(o2);
37                  }
38              };
39  
40      static final Runnable WAKEUP_TASK = new Runnable() {
41         @Override
42         public void run() { } // Do nothing
43      };
44  
45      PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
46  
47      long nextTaskId;
48  
49      protected AbstractScheduledEventExecutor() {
50      }
51  
52      protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
53          super(parent);
54      }
55  
56      @Override
57      public Ticker ticker() {
58          return Ticker.systemTicker();
59      }
60  
61      /**
62       * Get the current time in nanoseconds by this executor's clock. This is not the same as {@link System#nanoTime()}
63       * for two reasons:
64       *
65       * <ul>
66       *     <li>We apply a fixed offset to the {@link System#nanoTime() nanoTime}</li>
67       *     <li>Implementations (in particular EmbeddedEventLoop) may use their own time source so they can control time
68       *     for testing purposes.</li>
69       * </ul>
70       *
71       * @deprecated Please use (or override) {@link #ticker()} instead. This method delegates to {@link #ticker()}. Old
72       * code may still call this method for compatibility.
73       */
74      @Deprecated
75      protected long getCurrentTimeNanos() {
76          return ticker().nanoTime();
77      }
78  
79      /**
80       * @deprecated Use the non-static {@link #ticker()} instead.
81       */
82      @Deprecated
83      protected static long nanoTime() {
84          return Ticker.systemTicker().nanoTime();
85      }
86  
87      /**
88       * @deprecated Use the non-static {@link #ticker()} instead.
89       */
90      @Deprecated
91      static long defaultCurrentTimeNanos() {
92          return Ticker.systemTicker().nanoTime();
93      }
94  
95      static long deadlineNanos(long nanoTime, long delay) {
96          long deadlineNanos = nanoTime + delay;
97          // Guard against overflow
98          return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
99      }
100 
101     /**
102      * Given an arbitrary deadline {@code deadlineNanos}, calculate the number of nano seconds from now
103      * {@code deadlineNanos} would expire.
104      * @param deadlineNanos An arbitrary deadline in nano seconds.
105      * @return the number of nano seconds from now {@code deadlineNanos} would expire.
106      * @deprecated Use {@link #ticker()} instead
107      */
108     @Deprecated
109     protected static long deadlineToDelayNanos(long deadlineNanos) {
110         return ScheduledFutureTask.deadlineToDelayNanos(defaultCurrentTimeNanos(), deadlineNanos);
111     }
112 
113     /**
114      * Returns the amount of time left until the scheduled task with the closest dead line is executed.
115      */
116     protected long delayNanos(long currentTimeNanos, long scheduledPurgeInterval) {
117         currentTimeNanos -= ticker().initialNanoTime();
118 
119         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
120         if (scheduledTask == null) {
121             return scheduledPurgeInterval;
122         }
123 
124         return scheduledTask.delayNanos(currentTimeNanos);
125     }
126 
127     /**
128      * The initial value used for delay and computations based upon a monatomic time source.
129      * @return initial value used for delay and computations based upon a monatomic time source.
130      * @deprecated Use {@link #ticker()} instead
131      */
132     @Deprecated
133     protected static long initialNanoTime() {
134         return Ticker.systemTicker().initialNanoTime();
135     }
136 
137     PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
138         if (scheduledTaskQueue == null) {
139             scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
140                     SCHEDULED_FUTURE_TASK_COMPARATOR,
141                     // Use same initial capacity as java.util.PriorityQueue
142                     11);
143         }
144         return scheduledTaskQueue;
145     }
146 
147     private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
148         return queue == null || queue.isEmpty();
149     }
150 
151     /**
152      * Cancel all scheduled tasks.
153      *
154      * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
155      */
156     protected void cancelScheduledTasks() {
157         assert inEventLoop();
158         PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
159         if (isNullOrEmpty(scheduledTaskQueue)) {
160             return;
161         }
162 
163         final ScheduledFutureTask<?>[] scheduledTasks =
164                 scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]);
165 
166         for (ScheduledFutureTask<?> task: scheduledTasks) {
167             task.cancelWithoutRemove(false);
168         }
169 
170         scheduledTaskQueue.clearIgnoringIndexes();
171     }
172 
173     /**
174      * @see #pollScheduledTask(long)
175      */
176     protected final Runnable pollScheduledTask() {
177         return pollScheduledTask(getCurrentTimeNanos());
178     }
179 
180     /**
181      * Fetch scheduled tasks from the internal queue and add these to the given {@link Queue}.
182      *
183      * @param taskQueue the task queue into which the fetched scheduled tasks should be transferred.
184      * @return {@code true} if we were able to transfer everything, {@code false} if we need to call this method again
185      *         as soon as there is space again in {@code taskQueue}.
186      */
187     protected boolean fetchFromScheduledTaskQueue(Queue<Runnable> taskQueue) {
188         assert inEventLoop();
189         Objects.requireNonNull(taskQueue, "taskQueue");
190         if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
191             return true;
192         }
193         long nanoTime = getCurrentTimeNanos();
194         for (;;) {
195             Runnable scheduledTask = pollScheduledTask(nanoTime);
196             if (scheduledTask == null) {
197                 return true;
198             }
199             if (!taskQueue.offer(scheduledTask)) {
200                 // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
201                 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
202                 return false;
203             }
204         }
205     }
206 
207     /**
208      * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
209      * You should use {@link #getCurrentTimeNanos()} to retrieve the correct {@code nanoTime}.
210      */
211     protected final Runnable pollScheduledTask(long nanoTime) {
212         assert inEventLoop();
213 
214         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
215         if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
216             return null;
217         }
218         scheduledTaskQueue.remove();
219         scheduledTask.setConsumed();
220         return scheduledTask;
221     }
222 
223     /**
224      * Return the nanoseconds until the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
225      */
226     protected final long nextScheduledTaskNano() {
227         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
228         return scheduledTask != null ? scheduledTask.delayNanos() : -1;
229     }
230 
231     /**
232      * Return the deadline (in nanoseconds) when the next scheduled task is ready to be run or {@code -1}
233      * if no task is scheduled.
234      */
235     protected final long nextScheduledTaskDeadlineNanos() {
236         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
237         return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
238     }
239 
240     final ScheduledFutureTask<?> peekScheduledTask() {
241         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
242         return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
243     }
244 
245     /**
246      * Returns {@code true} if a scheduled task is ready for processing.
247      */
248     protected final boolean hasScheduledTasks() {
249         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
250         return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos();
251     }
252 
253     @Override
254     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
255         ObjectUtil.checkNotNull(command, "command");
256         ObjectUtil.checkNotNull(unit, "unit");
257         if (delay < 0) {
258             delay = 0;
259         }
260         validateScheduled0(delay, unit);
261 
262         return schedule(new ScheduledFutureTask<Void>(
263                 this,
264                 command,
265                 deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
266     }
267 
268     @Override
269     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
270         ObjectUtil.checkNotNull(callable, "callable");
271         ObjectUtil.checkNotNull(unit, "unit");
272         if (delay < 0) {
273             delay = 0;
274         }
275         validateScheduled0(delay, unit);
276 
277         return schedule(new ScheduledFutureTask<V>(
278                 this, callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
279     }
280 
281     @Override
282     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
283         ObjectUtil.checkNotNull(command, "command");
284         ObjectUtil.checkNotNull(unit, "unit");
285         if (initialDelay < 0) {
286             throw new IllegalArgumentException(
287                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
288         }
289         if (period <= 0) {
290             throw new IllegalArgumentException(
291                     String.format("period: %d (expected: > 0)", period));
292         }
293         validateScheduled0(initialDelay, unit);
294         validateScheduled0(period, unit);
295 
296         return schedule(new ScheduledFutureTask<Void>(
297                 this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period)));
298     }
299 
300     @Override
301     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
302         ObjectUtil.checkNotNull(command, "command");
303         ObjectUtil.checkNotNull(unit, "unit");
304         if (initialDelay < 0) {
305             throw new IllegalArgumentException(
306                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
307         }
308         if (delay <= 0) {
309             throw new IllegalArgumentException(
310                     String.format("delay: %d (expected: > 0)", delay));
311         }
312 
313         validateScheduled0(initialDelay, unit);
314         validateScheduled0(delay, unit);
315 
316         return schedule(new ScheduledFutureTask<Void>(
317                 this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay)));
318     }
319 
320     @SuppressWarnings("deprecation")
321     private void validateScheduled0(long amount, TimeUnit unit) {
322         validateScheduled(amount, unit);
323     }
324 
325     /**
326      * Sub-classes may override this to restrict the maximal amount of time someone can use to schedule a task.
327      *
328      * @deprecated will be removed in the future.
329      */
330     @Deprecated
331     protected void validateScheduled(long amount, TimeUnit unit) {
332         // NOOP
333     }
334 
335     final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
336         // nextTaskId a long and so there is no chance it will overflow back to 0
337         scheduledTaskQueue().add(task.setId(++nextTaskId));
338     }
339 
340     private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
341         if (inEventLoop()) {
342             scheduleFromEventLoop(task);
343         } else {
344             final long deadlineNanos = task.deadlineNanos();
345             // task will add itself to scheduled task queue when run if not expired
346             if (beforeScheduledTaskSubmitted(deadlineNanos)) {
347                 execute(task);
348             } else {
349                 lazyExecute(task);
350                 // Second hook after scheduling to facilitate race-avoidance
351                 if (afterScheduledTaskSubmitted(deadlineNanos)) {
352                     execute(WAKEUP_TASK);
353                 }
354             }
355         }
356 
357         return task;
358     }
359 
360     final void removeScheduled(final ScheduledFutureTask<?> task) {
361         assert task.isCancelled();
362         if (inEventLoop()) {
363             scheduledTaskQueue().removeTyped(task);
364         } else {
365             // task will remove itself from scheduled task queue when it runs
366             scheduleRemoveScheduled(task);
367         }
368     }
369 
370     void scheduleRemoveScheduled(final ScheduledFutureTask<?> task) {
371         // task will remove itself from scheduled task queue when it runs
372         lazyExecute(task);
373     }
374 
375     /**
376      * Called from arbitrary non-{@link EventExecutor} threads prior to scheduled task submission.
377      * Returns {@code true} if the {@link EventExecutor} thread should be woken immediately to
378      * process the scheduled task (if not already awake).
379      * <p>
380      * If {@code false} is returned, {@link #afterScheduledTaskSubmitted(long)} will be called with
381      * the same value <i>after</i> the scheduled task is enqueued, providing another opportunity
382      * to wake the {@link EventExecutor} thread if required.
383      *
384      * @param deadlineNanos deadline of the to-be-scheduled task
385      *     relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()}
386      * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
387      */
388     protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
389         return true;
390     }
391 
392     /**
393      * See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false.
394      *
395      * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()}
396      * @return  {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
397      */
398     protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
399         return true;
400     }
401 }