View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.util.concurrent;
17  
18  import io.netty5.util.internal.DefaultPriorityQueue;
19  import io.netty5.util.internal.PriorityQueue;
20  import io.netty5.util.internal.PriorityQueueNode;
21  
22  import java.util.Comparator;
23  import java.util.Queue;
24  import java.util.concurrent.Callable;
25  import java.util.concurrent.TimeUnit;
26  
27  import static java.util.Objects.requireNonNull;
28  import static java.util.concurrent.Executors.callable;
29  
30  /**
31   * Abstract base class for {@link EventExecutor}s that want to support scheduling.
32   */
33  public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
34      static final long START_TIME = System.nanoTime();
35  
36      private static final Comparator<RunnableScheduledFutureNode<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
37              Comparable::compareTo;
38      private static final RunnableScheduledFutureNode<?>[]
39              EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES = new RunnableScheduledFutureNode<?>[0];
40  
41      private PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue;
42  
43      protected AbstractScheduledEventExecutor() {
44      }
45  
46      /**
47       * The time elapsed since initialization of this class in nanoseconds. This may return a negative number just like
48       * {@link System#nanoTime()}.
49       */
50      public static long nanoTime() {
51          return defaultCurrentTimeNanos();
52      }
53  
54      /**
55       * The initial value used for delay and computations based upon a monatomic time source.
56       * @return initial value used for delay and computations based upon a monatomic time source.
57       */
58      protected static long initialNanoTime() {
59          return START_TIME;
60      }
61  
62      /**
63       * Get the current time in nanoseconds by this executor's clock. This is not the same as {@link System#nanoTime()}
64       * for two reasons:
65       *
66       * <ul>
67       *     <li>We apply a fixed offset to the {@link System#nanoTime() nanoTime}</li>
68       *     <li>Implementations (in particular EmbeddedEventLoop) may use their own time source so they can control time
69       *     for testing purposes.</li>
70       * </ul>
71       */
72      protected long getCurrentTimeNanos() {
73          return defaultCurrentTimeNanos();
74      }
75  
76      static long defaultCurrentTimeNanos() {
77          return System.nanoTime() - START_TIME;
78      }
79  
80      static long deadlineNanos(long nanoTime, long delay) {
81          long deadlineNanos = nanoTime + delay;
82          // Guard against overflow
83          return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
84      }
85  
86      PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue() {
87          if (scheduledTaskQueue == null) {
88              scheduledTaskQueue = new DefaultPriorityQueue<>(
89                      SCHEDULED_FUTURE_TASK_COMPARATOR,
90                      // Use same initial capacity as java.util.PriorityQueue
91                      11);
92          }
93          return scheduledTaskQueue;
94      }
95  
96      private static boolean isNullOrEmpty(Queue<RunnableScheduledFutureNode<?>> queue) {
97          return queue == null || queue.isEmpty();
98      }
99  
100     /**
101      * Cancel all scheduled tasks.
102      * <p>
103      * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
104      */
105     protected final void cancelScheduledTasks() {
106         assert inEventLoop();
107         PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
108         if (isNullOrEmpty(scheduledTaskQueue)) {
109             return;
110         }
111 
112         final RunnableScheduledFutureNode<?>[] scheduledTasks =
113                 scheduledTaskQueue.toArray(EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES);
114 
115         for (RunnableScheduledFutureNode<?> task : scheduledTasks) {
116             task.cancel();
117         }
118 
119         scheduledTaskQueue.clearIgnoringIndexes();
120     }
121 
122     /**
123      * @see #pollScheduledTask(long)
124      */
125     protected final RunnableScheduledFuture<?> pollScheduledTask() {
126         return pollScheduledTask(getCurrentTimeNanos());
127     }
128 
129     /**
130      * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. You should use {@link
131      * #getCurrentTimeNanos()} to retrieve the correct {@code nanoTime}.
132      * <p>
133      * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
134      */
135     protected final RunnableScheduledFuture<?> pollScheduledTask(long nanoTime) {
136         assert inEventLoop();
137 
138         Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
139         RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
140         if (scheduledTask == null) {
141             return null;
142         }
143 
144         if (scheduledTask.deadlineNanos() <= nanoTime) {
145             scheduledTaskQueue.remove();
146             return scheduledTask;
147         }
148         return null;
149     }
150 
151     /**
152      * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
153      * <p>
154      * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
155      */
156     protected final long nextScheduledTaskNano() {
157         Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
158         RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
159         if (scheduledTask == null) {
160             return -1;
161         }
162         return Math.max(0, scheduledTask.deadlineNanos() - getCurrentTimeNanos());
163     }
164 
165     final RunnableScheduledFuture<?> peekScheduledTask() {
166         Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
167         if (scheduledTaskQueue == null) {
168             return null;
169         }
170         return scheduledTaskQueue.peek();
171     }
172 
173     /**
174      * Returns {@code true} if a scheduled task is ready for processing.
175      * <p>
176      * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
177      */
178     protected final boolean hasScheduledTasks() {
179         assert inEventLoop();
180         Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
181         RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
182         return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos();
183     }
184 
185     @Override
186     public Future<Void> schedule(Runnable command, long delay, TimeUnit unit) {
187         requireNonNull(command, "command");
188         requireNonNull(unit, "unit");
189         if (delay < 0) {
190             delay = 0;
191         }
192         RunnableScheduledFuture<Void> task = newScheduledTaskFor(
193                 callable(command, null), deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)), 0);
194         return schedule(task);
195     }
196 
197     @Override
198     public <V> Future<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
199         requireNonNull(callable, "callable");
200         requireNonNull(unit, "unit");
201         if (delay < 0) {
202             delay = 0;
203         }
204         RunnableScheduledFuture<V> task = newScheduledTaskFor(
205                 callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)), 0);
206         return schedule(task);
207     }
208 
209     @Override
210     public Future<Void> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
211         requireNonNull(command, "command");
212         requireNonNull(unit, "unit");
213         if (initialDelay < 0) {
214             throw new IllegalArgumentException(
215                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
216         }
217         if (period <= 0) {
218             throw new IllegalArgumentException(
219                     String.format("period: %d (expected: > 0)", period));
220         }
221 
222         RunnableScheduledFuture<Void> task = newScheduledTaskFor(
223                 callable(command, null),
224                 deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period));
225         return schedule(task);
226     }
227 
228     @Override
229     public Future<Void> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
230         requireNonNull(command, "command");
231         requireNonNull(unit, "unit");
232         if (initialDelay < 0) {
233             throw new IllegalArgumentException(
234                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
235         }
236         if (delay <= 0) {
237             throw new IllegalArgumentException(
238                     String.format("delay: %d (expected: > 0)", delay));
239         }
240 
241         RunnableScheduledFuture<Void> task = newScheduledTaskFor(
242                 callable(command, null),
243                 deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay));
244         return schedule(task);
245     }
246 
247     /**
248      * Add the {@link RunnableScheduledFuture} for execution.
249      */
250     protected final <V> Future<V> schedule(final RunnableScheduledFuture<V> task) {
251         if (inEventLoop()) {
252             add0(task);
253         } else {
254             execute(() -> add0(task));
255         }
256         return task;
257     }
258 
259     private <V> void add0(RunnableScheduledFuture<V> task) {
260         final RunnableScheduledFutureNode<V> node;
261         if (task instanceof RunnableScheduledFutureNode) {
262             node = (RunnableScheduledFutureNode<V>) task;
263         } else {
264             node = new DefaultRunnableScheduledFutureNode<>(task);
265         }
266         scheduledTaskQueue().add(node);
267     }
268 
269     final void removeScheduled(final RunnableScheduledFutureNode<?> task) {
270         if (inEventLoop()) {
271             scheduledTaskQueue().removeTyped(task);
272         } else {
273             execute(() -> removeScheduled(task));
274         }
275     }
276 
277     /**
278      * Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Callable}.
279      * <p>
280      * This can be used if you want to override {@link #newScheduledTaskFor(Callable, long, long)} and return a
281      * different {@link RunnableFuture}.
282      */
283     protected static <V> RunnableScheduledFuture<V> newRunnableScheduledFuture(
284             AbstractScheduledEventExecutor executor, Promise<V> promise, Callable<V> task,
285             long deadlineNanos, long periodNanos) {
286         return new RunnableScheduledFutureAdapter<>(executor, promise, task, deadlineNanos, periodNanos);
287     }
288 
289     /**
290      * Returns a {@code RunnableScheduledFuture} for the given values.
291      */
292     protected <V> RunnableScheduledFuture<V> newScheduledTaskFor(
293             Callable<V> callable, long deadlineNanos, long period) {
294         return newRunnableScheduledFuture(this, newPromise(), callable, deadlineNanos, period);
295     }
296 
297     interface RunnableScheduledFutureNode<V> extends PriorityQueueNode, RunnableScheduledFuture<V> {
298     }
299 
300     private static final class DefaultRunnableScheduledFutureNode<V> implements RunnableScheduledFutureNode<V> {
301         private final RunnableScheduledFuture<V> future;
302         private int queueIndex = INDEX_NOT_IN_QUEUE;
303 
304         DefaultRunnableScheduledFutureNode(RunnableScheduledFuture<V> future) {
305             this.future = future;
306         }
307 
308         @Override
309         public EventExecutor executor() {
310             return future.executor();
311         }
312 
313         @Override
314         public long deadlineNanos() {
315             return future.deadlineNanos();
316         }
317 
318         @Override
319         public long delayNanos() {
320             return future.delayNanos();
321         }
322 
323         @Override
324         public long delayNanos(long currentTimeNanos) {
325             return future.delayNanos(currentTimeNanos);
326         }
327 
328         @Override
329         public RunnableScheduledFuture<V> addListener(FutureListener<? super V> listener) {
330             future.addListener(listener);
331             return this;
332         }
333 
334         @Override
335         public <C> RunnableScheduledFuture<V> addListener(
336                 C context, FutureContextListener<? super C, ? super V> listener) {
337             future.addListener(context, listener);
338             return this;
339         }
340 
341         @Override
342         public boolean isPeriodic() {
343             return future.isPeriodic();
344         }
345 
346         @Override
347         public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
348             return queueIndex;
349         }
350 
351         @Override
352         public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
353             queueIndex = i;
354         }
355 
356         @Override
357         public void run() {
358             future.run();
359         }
360 
361         @Override
362         public boolean cancel() {
363             return future.cancel();
364         }
365 
366         @Override
367         public boolean isCancelled() {
368             return future.isCancelled();
369         }
370 
371         @Override
372         public boolean isDone() {
373             return future.isDone();
374         }
375 
376         @Override
377         public FutureCompletionStage<V> asStage() {
378             return future.asStage();
379         }
380 
381         @Override
382         public int compareTo(RunnableScheduledFuture<?> o) {
383             return future.compareTo(o);
384         }
385 
386         @Override
387         public boolean isSuccess() {
388             return future.isSuccess();
389         }
390 
391         @Override
392         public boolean isFailed() {
393             return future.isFailed();
394         }
395 
396         @Override
397         public boolean isCancellable() {
398             return future.isCancellable();
399         }
400 
401         @Override
402         public Throwable cause() {
403             return future.cause();
404         }
405 
406         @Override
407         public V getNow() {
408             return future.getNow();
409         }
410     }
411 }