View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.DefaultPriorityQueue;
19  import io.netty.util.internal.ObjectUtil;
20  import io.netty.util.internal.PriorityQueue;
21  
22  import java.util.Comparator;
23  import java.util.Objects;
24  import java.util.Queue;
25  import java.util.concurrent.Callable;
26  import java.util.concurrent.TimeUnit;
27  
28  /**
29   * Abstract base class for {@link EventExecutor}s that want to support scheduling.
30   */
31  public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
32      private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
33              new Comparator<ScheduledFutureTask<?>>() {
34                  @Override
35                  public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
36                      return o1.compareTo(o2);
37                  }
38              };
39  
40      static final Runnable WAKEUP_TASK = new Runnable() {
41         @Override
42         public void run() { } // Do nothing
43      };
44  
45      PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
46  
47      long nextTaskId;
48  
49      protected AbstractScheduledEventExecutor() {
50      }
51  
52      protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
53          super(parent);
54      }
55  
56      @Override
57      public Ticker ticker() {
58          return Ticker.systemTicker();
59      }
60  
61      /**
62       * Get the current time in nanoseconds by this executor's clock. This is not the same as {@link System#nanoTime()}
63       * for two reasons:
64       *
65       * <ul>
66       *     <li>We apply a fixed offset to the {@link System#nanoTime() nanoTime}</li>
67       *     <li>Implementations (in particular EmbeddedEventLoop) may use their own time source so they can control time
68       *     for testing purposes.</li>
69       * </ul>
70       *
71       * @deprecated Please use (or override) {@link #ticker()} instead. This method delegates to {@link #ticker()}. Old
72       * code may still call this method for compatibility.
73       */
74      @Deprecated
75      protected long getCurrentTimeNanos() {
76          return ticker().nanoTime();
77      }
78  
79      /**
80       * @deprecated Use the non-static {@link #ticker()} instead.
81       */
82      @Deprecated
83      protected static long nanoTime() {
84          return Ticker.systemTicker().nanoTime();
85      }
86  
87      /**
88       * @deprecated Use the non-static {@link #ticker()} instead.
89       */
90      @Deprecated
91      static long defaultCurrentTimeNanos() {
92          return Ticker.systemTicker().nanoTime();
93      }
94  
95      static long deadlineNanos(long nanoTime, long delay) {
96          long deadlineNanos = nanoTime + delay;
97          // Guard against overflow
98          return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
99      }
100 
101     /**
102      * Given an arbitrary deadline {@code deadlineNanos}, calculate the number of nano seconds from now
103      * {@code deadlineNanos} would expire.
104      * @param deadlineNanos An arbitrary deadline in nano seconds.
105      * @return the number of nano seconds from now {@code deadlineNanos} would expire.
106      * @deprecated Use {@link #ticker()} instead
107      */
108     @Deprecated
109     protected static long deadlineToDelayNanos(long deadlineNanos) {
110         return ScheduledFutureTask.deadlineToDelayNanos(defaultCurrentTimeNanos(), deadlineNanos);
111     }
112 
113     /**
114      * Returns the amount of time left until the scheduled task with the closest dead line is executed.
115      */
116     protected long delayNanos(long currentTimeNanos, long scheduledPurgeInterval) {
117         currentTimeNanos -= ticker().initialNanoTime();
118 
119         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
120         if (scheduledTask == null) {
121             return scheduledPurgeInterval;
122         }
123 
124         return scheduledTask.delayNanos(currentTimeNanos);
125     }
126 
127     /**
128      * The initial value used for delay and computations based upon a monatomic time source.
129      * @return initial value used for delay and computations based upon a monatomic time source.
130      * @deprecated Use {@link #ticker()} instead
131      */
132     @Deprecated
133     protected static long initialNanoTime() {
134         return Ticker.systemTicker().initialNanoTime();
135     }
136 
137     PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
138         if (scheduledTaskQueue == null) {
139             scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
140                     SCHEDULED_FUTURE_TASK_COMPARATOR,
141                     // Use same initial capacity as java.util.PriorityQueue
142                     11);
143         }
144         return scheduledTaskQueue;
145     }
146 
147     private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
148         return queue == null || queue.isEmpty();
149     }
150 
151     /**
152      * Cancel all scheduled tasks.
153      *
154      * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
155      */
156     protected void cancelScheduledTasks() {
157         assert inEventLoop();
158         PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
159         if (isNullOrEmpty(scheduledTaskQueue)) {
160             return;
161         }
162 
163         final ScheduledFutureTask<?>[] scheduledTasks =
164                 scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]);
165 
166         for (ScheduledFutureTask<?> task: scheduledTasks) {
167             task.cancelWithoutRemove(false);
168         }
169 
170         scheduledTaskQueue.clearIgnoringIndexes();
171     }
172 
173     /**
174      * @see #pollScheduledTask(long)
175      */
176     protected final Runnable pollScheduledTask() {
177         return pollScheduledTask(getCurrentTimeNanos());
178     }
179 
180     /**
181      * Fetch scheduled tasks from the internal queue and add these to the given {@link Queue}.
182      *
183      * @param taskQueue the task queue into which the fetched scheduled tasks should be transferred.
184      * @return {@code true} if we were able to transfer everything, {@code false} if we need to call this method again
185      *         as soon as there is space again in {@code taskQueue}.
186      */
187     protected boolean fetchFromScheduledTaskQueue(Queue<Runnable> taskQueue) {
188         assert inEventLoop();
189         Objects.requireNonNull(taskQueue, "taskQueue");
190         if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
191             return true;
192         }
193         long nanoTime = getCurrentTimeNanos();
194         for (;;) {
195             ScheduledFutureTask scheduledTask = (ScheduledFutureTask) pollScheduledTask(nanoTime);
196             if (scheduledTask == null) {
197                 return true;
198             }
199             if (scheduledTask.isCancelled()) {
200                 continue;
201             }
202             if (!taskQueue.offer(scheduledTask)) {
203                 // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
204                 scheduledTaskQueue.add(scheduledTask);
205                 return false;
206             }
207         }
208     }
209 
210     /**
211      * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
212      * You should use {@link #getCurrentTimeNanos()} to retrieve the correct {@code nanoTime}.
213      */
214     protected final Runnable pollScheduledTask(long nanoTime) {
215         assert inEventLoop();
216 
217         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
218         if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
219             return null;
220         }
221         scheduledTaskQueue.remove();
222         scheduledTask.setConsumed();
223         return scheduledTask;
224     }
225 
226     /**
227      * Return the nanoseconds until the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
228      */
229     protected final long nextScheduledTaskNano() {
230         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
231         return scheduledTask != null ? scheduledTask.delayNanos() : -1;
232     }
233 
234     /**
235      * Return the deadline (in nanoseconds) when the next scheduled task is ready to be run or {@code -1}
236      * if no task is scheduled.
237      */
238     protected final long nextScheduledTaskDeadlineNanos() {
239         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
240         return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
241     }
242 
243     final ScheduledFutureTask<?> peekScheduledTask() {
244         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
245         return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
246     }
247 
248     /**
249      * Returns {@code true} if a scheduled task is ready for processing.
250      */
251     protected final boolean hasScheduledTasks() {
252         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
253         return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos();
254     }
255 
256     @Override
257     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
258         ObjectUtil.checkNotNull(command, "command");
259         ObjectUtil.checkNotNull(unit, "unit");
260         if (delay < 0) {
261             delay = 0;
262         }
263         validateScheduled0(delay, unit);
264 
265         return schedule(new ScheduledFutureTask<Void>(
266                 this,
267                 command,
268                 deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
269     }
270 
271     @Override
272     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
273         ObjectUtil.checkNotNull(callable, "callable");
274         ObjectUtil.checkNotNull(unit, "unit");
275         if (delay < 0) {
276             delay = 0;
277         }
278         validateScheduled0(delay, unit);
279 
280         return schedule(new ScheduledFutureTask<V>(
281                 this, callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
282     }
283 
284     @Override
285     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
286         ObjectUtil.checkNotNull(command, "command");
287         ObjectUtil.checkNotNull(unit, "unit");
288         if (initialDelay < 0) {
289             throw new IllegalArgumentException(
290                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
291         }
292         if (period <= 0) {
293             throw new IllegalArgumentException(
294                     String.format("period: %d (expected: > 0)", period));
295         }
296         validateScheduled0(initialDelay, unit);
297         validateScheduled0(period, unit);
298 
299         return schedule(new ScheduledFutureTask<Void>(
300                 this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period)));
301     }
302 
303     @Override
304     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
305         ObjectUtil.checkNotNull(command, "command");
306         ObjectUtil.checkNotNull(unit, "unit");
307         if (initialDelay < 0) {
308             throw new IllegalArgumentException(
309                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
310         }
311         if (delay <= 0) {
312             throw new IllegalArgumentException(
313                     String.format("delay: %d (expected: > 0)", delay));
314         }
315 
316         validateScheduled0(initialDelay, unit);
317         validateScheduled0(delay, unit);
318 
319         return schedule(new ScheduledFutureTask<Void>(
320                 this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay)));
321     }
322 
323     @SuppressWarnings("deprecation")
324     private void validateScheduled0(long amount, TimeUnit unit) {
325         validateScheduled(amount, unit);
326     }
327 
328     /**
329      * Sub-classes may override this to restrict the maximal amount of time someone can use to schedule a task.
330      *
331      * @deprecated will be removed in the future.
332      */
333     @Deprecated
334     protected void validateScheduled(long amount, TimeUnit unit) {
335         // NOOP
336     }
337 
338     final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
339         // nextTaskId a long and so there is no chance it will overflow back to 0
340         if (task.getId() == 0L) {
341             task.setId(++nextTaskId);
342         }
343         scheduledTaskQueue().add(task);
344     }
345 
346     private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
347         if (inEventLoop()) {
348             scheduleFromEventLoop(task);
349         } else {
350             final long deadlineNanos = task.deadlineNanos();
351             // task will add itself to scheduled task queue when run if not expired
352             if (beforeScheduledTaskSubmitted(deadlineNanos)) {
353                 execute(task);
354             } else {
355                 lazyExecute(task);
356                 // Second hook after scheduling to facilitate race-avoidance
357                 if (afterScheduledTaskSubmitted(deadlineNanos)) {
358                     execute(WAKEUP_TASK);
359                 }
360             }
361         }
362 
363         return task;
364     }
365 
366     final void removeScheduled(final ScheduledFutureTask<?> task) {
367         assert task.isCancelled();
368         if (inEventLoop()) {
369             scheduledTaskQueue().removeTyped(task);
370         } else {
371             // task will remove itself from scheduled task queue when it runs
372             scheduleRemoveScheduled(task);
373         }
374     }
375 
376     void scheduleRemoveScheduled(final ScheduledFutureTask<?> task) {
377         // task will remove itself from scheduled task queue when it runs
378         lazyExecute(task);
379     }
380 
381     /**
382      * Called from arbitrary non-{@link EventExecutor} threads prior to scheduled task submission.
383      * Returns {@code true} if the {@link EventExecutor} thread should be woken immediately to
384      * process the scheduled task (if not already awake).
385      * <p>
386      * If {@code false} is returned, {@link #afterScheduledTaskSubmitted(long)} will be called with
387      * the same value <i>after</i> the scheduled task is enqueued, providing another opportunity
388      * to wake the {@link EventExecutor} thread if required.
389      *
390      * @param deadlineNanos deadline of the to-be-scheduled task
391      *     relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()}
392      * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
393      */
394     protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
395         return true;
396     }
397 
398     /**
399      * See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false.
400      *
401      * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()}
402      * @return  {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
403      */
404     protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
405         return true;
406     }
407 }